def message_received(self, unwrapped_message):
"""
message is of type UnwrappedMessage
"""
delay = self._sys_rand.randint(0, self.max_delay)
action = start_action(
action_type=u"send delayed message",
delay=delay,
)
with action.context():
d = deferLater(self.reactor, delay, self.protocol.packet_proxy, unwrapped_message)
DeferredContext(d).addActionFinish()
self._pending_sends.add(d)
def _remove(res, d=d):
self._pending_sends.remove(d)
return res
d.addBoth(_remove)
python类reactor()的实例源码
def test_cannot_listen(self):
"""
When the program is run with an argument and a listen address specified
with a port that we can't listen on (e.g. port 1), a CannotListenError
is expected to be logged and the program should stop.
"""
temp_dir = self.useFixture(TempDir())
yield main(reactor, raw_args=[
temp_dir.path,
'--listen', ':1', # A port we can't listen on
])
# Expect a 'certs' directory to be created
self.assertThat(os.path.isdir(temp_dir.join('certs')), Equals(True))
# Expect a default certificate to be created
self.assertThat(os.path.isfile(temp_dir.join('default.pem')),
Equals(True))
# Expect to be unable to listen
flush_logged_errors(CannotListenError)
def get_events(self, callbacks):
"""
Attach to Marathon's event stream using Server-Sent Events (SSE).
:param callbacks:
A dict mapping event types to functions that handle the event data
"""
d = self.request('GET', path='/v2/events', unbuffered=True, headers={
'Accept': 'text/event-stream',
'Cache-Control': 'no-store'
})
def handler(event, data):
callback = callbacks.get(event)
# Deserialize JSON if a callback is present
if callback is not None:
callback(json.loads(data))
return d.addCallback(
sse_content, handler, reactor=self._reactor, **self._sse_kwargs)
def makeService(self, options):
"""Construct a server using MLLPFactory.
:rtype: :py:class:`twisted.application.internet.StreamServerEndpointService`
"""
from twisted.internet import reactor
from txHL7.mllp import IHL7Receiver, MLLPFactory
receiver_name = options['receiver']
receiver_class = reflect.namedClass(receiver_name)
verifyClass(IHL7Receiver, receiver_class)
factory = MLLPFactory(receiver_class())
multi_service = MultiService()
for port_number in PORTS:
port = "tcp:interface={0}:port={1}".format(HOST, port_number,)
endpoint = endpoints.serverFromString(reactor, port)
server = internet.StreamServerEndpointService(endpoint, factory)
server.setName(u"mllp-{0}-{1}".format(receiver_name, port_number))
multi_service.addService(server)
return multi_service
def listen(description, factory, default=None):
"""Listen on a port corresponding to a description
@type description: C{str}
@type factory: L{twisted.internet.interfaces.IProtocolFactory}
@type default: C{str} or C{None}
@rtype: C{twisted.internet.interfaces.IListeningPort}
@return: the port corresponding to a description of a reliable
virtual circuit server.
See the documentation of the C{parse} function for description
of the semantics of the arguments.
"""
from twisted.internet import reactor
name, args, kw = parse(description, factory, default)
return getattr(reactor, 'listen'+name)(*args, **kw)
def message_received(self, unwrapped_message):
"""
message is of type UnwrappedMessage
"""
self._batch.append(unwrapped_message) # [(destination, sphinx_packet)
if len(self._batch) >= self.threshold_count:
delay = self._sys_rand.randint(0, self.max_delay)
action = start_action(
action_type=u"send delayed message batch",
delay=delay,
)
with action.context():
released = self._batch
self._batch = []
random.shuffle(released)
d = deferLater(self.reactor, delay, self.batch_send, released)
DeferredContext(d).addActionFinish()
self._pending_batch_sends.add(d)
def _remove(res, d=d):
self._pending_batch_sends.remove(d)
return res
d.addBoth(_remove)
def _start_onion_service(self, factory):
def progress(percent, tag, message):
bar = int(percent / 10)
log.debug('[%s%s] %s' % ('#' * bar, '.' * (10 - bar), message))
def setup_complete(port):
port = txtorcon.IHiddenService(port)
self.uri = "http://%s" % (port.getHost().onion_uri)
log.info('I have set up a hidden service, advertised at: %s'
% self.uri)
log.info('locally listening on %s' % port.local_address.getHost())
def setup_failed(args):
log.error('onion service setup FAILED: %r' % args)
endpoint = endpoints.serverFromString(reactor, 'onion:80')
txtorcon.IProgressProvider(endpoint).add_progress_listener(progress)
d = endpoint.listen(factory)
d.addCallback(setup_complete)
d.addErrback(setup_failed)
return d
def connectionMade(self):
logger.info('[%s] Connection received from VNC client', self.id)
factory = protocol.ClientFactory()
factory.protocol = VNCProxyClient
factory.vnc_server = self
factory.deferrable = defer.Deferred()
endpoint = endpoints.clientFromString(reactor, self.factory.vnc_address)
def _established_callback(client):
if self._broken:
client.close()
self.vnc_client = client
self.flush()
def _established_errback(reason):
logger.error('[VNCProxyServer] Connection succeeded but could not establish session: %s', reason)
self.close()
factory.deferrable.addCallbacks(_established_callback, _established_errback)
def _connect_errback(reason):
logger.error('[VNCProxyServer] Connection failed: %s', reason)
self.close()
endpoint.connect(factory).addErrback(_connect_errback)
self.send_ProtocolVersion_Handshake()
def run(self):
"""setup the site, start listening on port, setup the looping call to
:py:meth:`~.update_active_node` every ``self.poll_interval`` seconds,
and start the Twisted reactor"""
# get the active node before we start anything...
self.active_node_ip_port = self.get_active_node()
if self.active_node_ip_port is None:
logger.critical("ERROR: Could not get active vault node from "
"Consul. Exiting.")
raise SystemExit(3)
logger.warning("Initial Vault active node: %s",
self.active_node_ip_port)
site = Site(VaultRedirectorSite(self))
# setup our HTTP(S) listener
if self.tls_factory is not None:
self.listentls(site)
else:
self.listentcp(site)
# setup the update_active_node poll every POLL_INTERVAL seconds
self.add_update_loop()
logger.warning('Starting Twisted reactor (event loop)')
self.run_reactor()
def se_requester(self):
"""
While the reactor is polling, we can't make any requests. So have the
reactor itself make the request and store the result.
"""
logger.debug('requester called; spawning process')
# since Python is single-threaded and Twisted is just event-based,
# we can't do a request and run the redirector from the same script.
# Best choice is to used popen to run an external script to do the
# redirect.
url = 'http://127.0.0.1:%d' % self.cls.bind_port
path = os.path.join(os.path.dirname(__file__), 'requester.py')
self.poller = subprocess.Popen(
[sys.executable, path, url, '/bar/baz', '/vault-redirector-health'],
stdout=subprocess.PIPE,
universal_newlines=True
)
# run a poller loop to check for process stop and get results
self.poller_check_task = task.LoopingCall(self.check_request)
self.poller_check_task.clock = self.cls.reactor
self.poller_check_task.start(0.5)
logger.debug('poller_check_task started')
def check_request(self):
"""
check if the self.poller process has finished; if so, handle results
and stop the poller_check_task. If update_active has also already been
called, stop the reactor.
"""
logger.debug('check_request called')
if self.poller.poll() is None:
logger.debug('poller process still running')
return
# stop the looping task
self.poller_check_task.stop()
assert self.poller.returncode == 0
out, err = self.poller.communicate()
self.response = out.strip()
logger.debug('check_request done; response: %s', self.response)
# on python3, this will be binary
if not isinstance(self.response, str):
self.response = self.response.decode('utf-8')
if self.update_active_called:
self.stop_reactor()
ldapproxy_plugin.py 文件源码
项目:privacyidea-ldap-proxy
作者: NetKnights-GmbH
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def makeService(self, options):
"""
Called by Twisted after having parsed the command-line options.
:param options: ``usage.Options`` instance
:return: the server instance
"""
# Configuration is mandatory
if options['config'] is None:
print 'You need to specify a configuration file via `twistd ldap-proxy -c config.ini`.'
sys.exit(1)
config = load_config(options['config'])
factory = ProxyServerFactory(config)
endpoint_string = serverFromString(reactor, config['ldap-proxy']['endpoint'])
return internet.StreamServerEndpointService(endpoint_string, factory)
def connect_service_account(self):
"""
Make a new connection to the LDAP backend server using the credentials of the service account
:return: A Deferred that fires a `LDAPClient` instance
"""
client = yield connectToLDAPEndpoint(reactor, self.proxied_endpoint_string, LDAPClient)
if self.use_tls:
client = yield client.startTLS()
try:
yield client.bind(self.service_account_dn, self.service_account_password)
except ldaperrors.LDAPException, e:
# Call unbind() here if an exception occurs: Otherwise, Twisted will keep the file open
# and slowly run out of open files.
yield client.unbind()
raise e
defer.returnValue(client)
def test_twisted(pyi_builder):
pyi_builder.test_source(
"""
# Twisted is an event-driven networking engine.
#
# The 'reactor' is object that starts the eventloop.
# There are different types of platform specific reactors.
# Platform specific reactor is wrapped into twisted.internet.reactor module.
from twisted.internet import reactor
# Applications importing module twisted.internet.reactor might fail
# with error like:
#
# AttributeError: 'module' object has no attribute 'listenTCP'
#
# Ensure default reactor was loaded - it has method 'listenTCP' to start server.
if not hasattr(reactor, 'listenTCP'):
raise SystemExit('Twisted reactor not properly initialized.')
""")
def __init__(self, domain, username, pw, server, use_ssl, policy_key=0, server_version="14.0", device_type="iPhone", device_id=None, verbose=False):
self.use_ssl = use_ssl
self.domain = domain
self.username = username
self.password = pw
self.server = server
self.device_id = device_id
if not self.device_id:
self.device_id = str(uuid.uuid4()).replace("-","")[:32]
self.server_version = server_version
self.device_type = device_type
self.policy_key = policy_key
self.folder_data = {}
self.verbose = verbose
self.collection_data = {}
clientContext = WebClientContextFactory()
self.agent = Agent(reactor, clientContext)
self.operation_queue = defer.DeferredQueue()
self.queue_deferred = self.operation_queue.get()
self.queue_deferred.addCallback(self.queue_full)
# Response processing
def test_hpe_create_volume_invalid_provisioning_option(self):
name = 'test-create-volume-fake'
path = b"/VolumeDriver.Create"
body = {u"Name": name,
u"Opts": {u"provisioning": u"fake"}}
headers = Headers({b"content-type": [b"application/json"]})
body_producer = FileBodyProducer(BytesIO(dumps(body)))
agent = Agent.usingEndpointFactory(reactor, HPEEndpointFactory())
d = agent.request(b'POST', b"UNIX://localhost" + path, headers,
body_producer)
d.addCallback(self.checkResponse, json.dumps({
u"Err": "Invalid input received: Must specify a valid " +
"provisioning type ['thin', 'full', " +
"'dedup'], value 'fake' is invalid."}))
d.addCallback(self._remove_volume_callback, name)
d.addErrback(self.cbFailed)
return d
def test_hpe_create_volume_invalid_option(self):
name = 'test-create-volume-fake'
path = b"/VolumeDriver.Create"
body = {u"Name": name,
u"Opts": {u"fake": u"fake"}}
headers = Headers({b"content-type": [b"application/json"]})
body_producer = FileBodyProducer(BytesIO(dumps(body)))
agent = Agent.usingEndpointFactory(reactor, HPEEndpointFactory())
d = agent.request(b'POST', b"UNIX://localhost" + path, headers,
body_producer)
d.addCallback(self.checkResponse, json.dumps({
u"Err": "create volume failed, error is: fake is not a valid "
"option. Valid options are: ['size', 'provisioning', "
"'flash-cache']"}))
d.addCallback(self._remove_volume_callback, name)
d.addErrback(self.cbFailed)
return d
def _get_volume_mount_path(self, body, name):
# NOTE: body arg is the result from last deferred call.
# Python complains about parameter mis-match if you don't include it
# In this test, we need it to compare expected results with Path
# request
# Compare path returned by mount (body) with Get Path request
path = b"/VolumeDriver.Path"
newbody = {u"Name": name}
headers = Headers({b"content-type": [b"application/json"]})
body_producer = FileBodyProducer(BytesIO(dumps(newbody)))
agent = Agent.usingEndpointFactory(reactor, HPEEndpointFactory())
d = agent.request(b'POST', b"UNIX://localhost" + path, headers,
body_producer)
d.addCallback(self.checkResponse, body)
d.addErrback(self.cbFailed)
return d
def _mount_the_volume(self, body, name):
# NOTE: body arg is the result from last deferred call.
# Python complains about parameter mis-match if you don't include it
# Mount the previously created volume
path = b"/VolumeDriver.Mount"
newbody = {u"Name": name}
headers = Headers({b"content-type": [b"application/json"]})
body_producer = FileBodyProducer(BytesIO(dumps(newbody)))
agent = Agent.usingEndpointFactory(reactor, HPEEndpointFactory())
d = agent.request(b'POST', b"UNIX://localhost" + path, headers,
body_producer)
d.addCallback(self.getResponse)
# If we get a valid response from Path request then we assume
# the mount passed.
# TODO: Add additonal logic to verify the mountpath
d.addCallback(self._get_volume_mount_path, name)
return d
def broken_test_hpe_mount_umount_volume(self):
name = 'test-mount-volume'
path = b"/VolumeDriver.Create"
body = {u"Name": name}
# Create a volume to be mounted
headers = Headers({b"content-type": [b"application/json"]})
body_producer = FileBodyProducer(BytesIO(dumps(body)))
agent = Agent.usingEndpointFactory(reactor, HPEEndpointFactory())
d = agent.request(b'POST', b"UNIX://localhost" + path, headers,
body_producer)
d.addCallback(self.checkResponse, json.dumps({u"Err": ''}))
d.addErrback(self.cbFailed)
# Mount the previously created volume
d.addCallback(self._mount_the_volume, name)
# UMount the previously created volume
d.addCallback(self._unmount_the_volume, name)
# Remove the previously created volume
d.addCallback(self._remove_volume_callback, name)
return d
def test_hpe_get_volume(self):
name = 'test-get-volume'
path = b"/VolumeDriver.Create"
body = {u"Name": name}
# Create a volume to be mounted
headers = Headers({b"content-type": [b"application/json"]})
body_producer = FileBodyProducer(BytesIO(dumps(body)))
agent = Agent.usingEndpointFactory(reactor, HPEEndpointFactory())
d = agent.request(b'POST', b"UNIX://localhost" + path, headers,
body_producer)
d.addCallback(self.checkResponse, json.dumps({u"Err": ''}))
d.addErrback(self.cbFailed)
# Get the previously created volume
expected = {u"Volume": {u"Status": {},
u"Mountpoint": '',
u"Name": name},
u"Err": ''}
d.addCallback(self._get_volume, name, expected)
# Remove the previously created volume
d.addCallback(self._remove_volume_callback, name)
return d
def broken_test_hpe_list_volume(self):
name = 'test-list-volume'
path = b"/VolumeDriver.Create"
body = {u"Name": name}
# Create a volume to be mounted
headers = Headers({b"content-type": [b"application/json"]})
body_producer = FileBodyProducer(BytesIO(dumps(body)))
agent = Agent.usingEndpointFactory(reactor, HPEEndpointFactory())
d = agent.request(b'POST', b"UNIX://localhost" + path, headers,
body_producer)
d.addCallback(self.checkResponse, json.dumps({u"Err": ''}))
d.addErrback(self.cbFailed)
# List volumes
expected = {u"Err": '',
u"Volumes": [{u"Mountpoint": '',
u"Name": name}]}
d.addCallback(self._list_volumes, name, expected)
# Remove the previously created volume
d.addCallback(self._remove_volume_callback, name)
return d
def listen(description, factory, default=None):
"""Listen on a port corresponding to a description
@type description: C{str}
@type factory: L{twisted.internet.interfaces.IProtocolFactory}
@type default: C{str} or C{None}
@rtype: C{twisted.internet.interfaces.IListeningPort}
@return: the port corresponding to a description of a reliable
virtual circuit server.
See the documentation of the C{parse} function for description
of the semantics of the arguments.
"""
from twisted.internet import reactor
name, args, kw = parse(description, factory, default)
return getattr(reactor, 'listen'+name)(*args, **kw)
def testValidOptionsRequest(self):
"""
Makes sure that a "regular" OPTIONS request doesn't include the CORS
specific headers in the response.
"""
agent = Agent(reactor)
headers = Headers({'origin': ['http://localhost']})
response = yield agent.request('OPTIONS', self.uri, headers)
# Check we get the correct status.
self.assertEqual(http.OK, response.code)
# Check we get the correct length
self.assertEqual(0, response.length)
# Check we get the right headers back
self.assertTrue(response.headers.hasHeader('Allow'))
self.assertFalse(
response.headers.hasHeader('Access-Control-Allow-Origin'))
self.assertFalse(response.headers.hasHeader('Access-Control-Max-Age'))
self.assertFalse(
response.headers.hasHeader('Access-Control-Allow-Credentials'))
self.assertFalse(
response.headers.hasHeader('Access-Control-Allow-Methods'))
def testViaAgent(self):
"""
This is a manual check of a POST to /objects which uses
L{twisted.web.client.Agent} to make the request. We do not use
txFluidDB because we need to check that a Location header is
received and that we receive both a 'URI' and an 'id' in the JSON
response payload.
"""
URI = self.txEndpoint.getRootURL() + defaults.httpObjectCategoryName
basicAuth = 'Basic %s' % b64encode('%s:%s' % ('testuser1', 'secret'))
headers = Headers({'accept': ['application/json'],
'authorization': [basicAuth]})
agent = Agent(reactor)
response = yield agent.request('POST', URI, headers)
self.assertEqual(http.CREATED, response.code)
self.assertTrue(response.headers.hasHeader('location'))
d = defer.Deferred()
bodyGetter = ResponseGetter(d)
response.deliverBody(bodyGetter)
body = yield d
responseDict = json.loads(body)
self.assertIn('URI', responseDict)
self.assertIn('id', responseDict)
def testQueryUnicodePath(self):
"""A query on a non-existent Unicode tag, should 404. Part of the
point here is to make sure that no other error occurs due to
passing in a Unicode tag path.
"""
path = u'çóñ/???'
query = '%s = "hi"' % path
URI = '%s/%s?query=%s' % (
self.endpoint,
defaults.httpObjectCategoryName,
urllib.quote(query.encode('utf-8')))
headers = Headers({'accept': ['application/json']})
agent = Agent(reactor)
response = yield agent.request('GET', URI, headers)
self.assertEqual(http.NOT_FOUND, response.code)
def testValidCORSRequest(self):
"""
Sanity check to make sure we get the valid headers back for a CORS
based request.
"""
agent = Agent(reactor)
headers = Headers()
# The origin to use in the tests
dummy_origin = 'http://foo.com'
headers.addRawHeader('Origin', dummy_origin)
response = yield agent.request('GET', self.uri, headers)
# Check we get the correct status.
self.assertEqual(http.OK, response.code)
# Check we get the right headers back
self.assertTrue(
response.headers.hasHeader('Access-Control-Allow-Origin'))
self.assertTrue(
response.headers.hasHeader('Access-Control-Allow-Credentials'))
self.assertTrue(
dummy_origin in
response.headers.getRawHeaders('Access-Control-Allow-Origin'))
def testVersionGets404(self):
"""
Version numbers used to be able to be given in API calls, but are
no longer supported.
"""
version = 20100808
URI = '%s/%d/%s/%s' % (
self.endpoint,
version,
defaults.httpNamespaceCategoryName,
defaults.adminUsername)
headers = Headers({'accept': ['application/json']})
agent = Agent(reactor)
response = yield agent.request('GET', URI, headers)
self.assertEqual(http.NOT_FOUND, response.code)
# TODO: Add a test for a namespace that we don't have LIST perm on.
# although that might be done in permissions.py when that finally gets
# added.
def with_config(loop=None):
global config
if loop is not None:
if config.loop is not None and config.loop is not loop:
raise RuntimeError(
"Twisted has only a single, global reactor. You passed in "
"a reactor different from the one already configured "
"in txaio.config.loop"
)
return _TxApi(config)
# NOTE: beware that twisted.logger._logger.Logger copies itself via an
# overriden __get__ method when used as recommended as a class
# descriptor. So, we override __get__ to just return ``self`` which
# means ``log_source`` will be wrong, but we don't document that as a
# key that you can depend on anyway :/
def requestWebObject(self):
parsed = urlparse.urlparse(self.uri)
protocol = parsed[0]
host, port = self.extractHostAndPort(parsed, protocol)
rest = self.extractQuery(parsed)
class_ = self.protocols[protocol]
headers = self.getAllHeaders().copy()
if 'host' not in headers:
headers['host'] = host
log.info('Performing {} request for {}'.format(self.method, self.uri))
self.content.seek(0, 0)
s = self.content.read()
clientFactory = class_(self.method, rest, self.clientproto, headers,
s, self)
self.reactor.connectTCP(host, port, clientFactory)