def __init__(self, logPath=None, timeout=_REQUEST_TIMEOUT,
logFormatter=None, reactor=None):
"""
@param logFormatter: An object to format requests into log lines for
the access log.
@type logFormatter: L{IAccessLogFormatter} provider
@param reactor: A L{IReactorTime} provider used to compute logging
timestamps.
"""
if not reactor:
from twisted.internet import reactor
self._reactor = reactor
if logPath is not None:
logPath = os.path.abspath(logPath)
self.logPath = logPath
self.timeOut = timeout
if logFormatter is None:
logFormatter = combinedLogFormatter
self._logFormatter = logFormatter
# For storing the cached log datetime and the callback to update it
self._logDateTime = None
self._logDateTimeCall = None
python类provider()的实例源码
def __init__(self, connectedDeferred, wrappedProtocol):
"""
@param connectedDeferred: The L{Deferred} that will callback
with the C{wrappedProtocol} when it is connected.
@param wrappedProtocol: An L{IProtocol} provider that will be
connected.
"""
self._connectedDeferred = connectedDeferred
self._wrappedProtocol = wrappedProtocol
for iface in [interfaces.IHalfCloseableProtocol,
interfaces.IFileDescriptorReceiver,
interfaces.IHandshakeListener]:
if iface.providedBy(self._wrappedProtocol):
directlyProvides(self, iface)
def connect(self, protocolFactory):
"""
Implement L{IStreamClientEndpoint.connect} to launch a child process
and connect it to a protocol created by C{protocolFactory}.
@param protocolFactory: A factory for an L{IProtocol} provider which
will be notified of all events related to the created process.
"""
proto = protocolFactory.buildProtocol(_ProcessAddress())
try:
self._spawnProcess(
_WrapIProtocol(proto, self._executable, self._errFlag),
self._executable, self._args, self._env, self._path, self._uid,
self._gid, self._usePTY, self._childFDs)
except:
return defer.fail()
else:
return defer.succeed(proto)
def __init__(self, reactor, port, backlog, interface):
"""
@param reactor: An L{IReactorTCP} provider.
@param port: The port number used for listening
@type port: int
@param backlog: Size of the listen queue
@type backlog: int
@param interface: The hostname to bind to
@type interface: str
"""
self._reactor = reactor
self._port = port
self._backlog = backlog
self._interface = interface
def __init__(self, reactor, host, port, timeout=30, bindAddress=None):
"""
@param reactor: An L{IReactorTCP} provider
@param host: A hostname, used when connecting
@type host: str
@param port: The port number, used when connecting
@type port: int
@param timeout: The number of seconds to wait before assuming the
connection has failed.
@type timeout: int
@param bindAddress: A (host, port) tuple of local address to bind to,
or None.
@type bindAddress: tuple
"""
self._reactor = reactor
self._host = host
self._port = port
self._timeout = timeout
self._bindAddress = bindAddress
def __init__(self, reactor, port, sslContextFactory,
backlog=50, interface=''):
"""
@param reactor: An L{IReactorSSL} provider.
@param port: The port number used for listening
@type port: int
@param sslContextFactory: An instance of
L{interfaces.IOpenSSLContextFactory}.
@param backlog: Size of the listen queue
@type backlog: int
@param interface: The hostname to bind to, defaults to '' (all)
@type interface: str
"""
self._reactor = reactor
self._port = port
self._sslContextFactory = sslContextFactory
self._backlog = backlog
self._interface = interface
def __init__(self, reactor, path, timeout=30, checkPID=0):
"""
@param reactor: An L{IReactorUNIX} provider.
@param path: The path to the Unix socket file, used when connecting
@type path: str
@param timeout: Number of seconds to wait before assuming the
connection has failed.
@type timeout: int
@param checkPID: If True, check for a pid file to verify that a server
is listening.
@type checkPID: bool
"""
self._reactor = reactor
self._path = path
self._timeout = timeout
self._checkPID = checkPID
def _parseServer(self, reactor, port, backlog=50, interface='::'):
"""
Internal parser function for L{_parseServer} to convert the string
arguments into structured arguments for the L{TCP6ServerEndpoint}
@param reactor: An L{IReactorTCP} provider.
@param port: The port number used for listening
@type port: int
@param backlog: Size of the listen queue
@type backlog: int
@param interface: The hostname to bind to
@type interface: str
"""
port = int(port)
backlog = int(backlog)
return TCP6ServerEndpoint(reactor, port, backlog, interface)
def spawnProcess(self, processProtocol, executable, args=(), env={},
path=None, uid=None, gid=None, usePTY=0, childFDs=None):
"""
@ivar processProtocol: Stores the protocol passed to the reactor.
@return: An L{IProcessTransport} provider.
"""
self.processProtocol = processProtocol
self.executable = executable
self.args = args
self.env = env
self.path = path
self.uid = uid
self.gid = gid
self.usePTY = usePTY
self.childFDs = childFDs
self.processTransport = MemoryProcessTransport()
self.processProtocol.makeConnection(self.processTransport)
return self.processTransport
def retries(timeout=30, intervals=1, clock=reactor):
"""Helper for retrying something, sleeping between attempts.
Returns a generator that yields ``(elapsed, remaining, wait)`` tuples,
giving times in seconds. The last item, `wait`, is the suggested amount of
time to sleep before trying again.
:param timeout: From now, how long to keep iterating, in seconds. This can
be specified as a number, or as an iterable. In the latter case, the
iterator is advanced each time an interval is needed. This allows for
back-off strategies.
:param intervals: The sleep between each iteration, in seconds, an an
iterable from which to obtain intervals.
:param clock: An optional `IReactorTime` provider. Defaults to the
installed reactor.
"""
start = clock.seconds()
end = start + timeout
if isinstance(intervals, Iterable):
intervals = iter(intervals)
else:
intervals = repeat(intervals)
return gen_retries(start, end, intervals, clock)
def getPeer(self):
"""
Get the remote address of this connection.
@return: An L{IAddress} provider.
"""
return self.transport.getPeer()
def getHost(self):
"""
Get the local address of this connection.
@return: An L{IAddress} provider.
"""
return self.transport.getHost()
def registerProducer(self, producer, streaming):
"""
Register to receive data from a producer.
This sets self to be a consumer for a producer. When this object runs
out of data (as when a send(2) call on a socket succeeds in moving the
last data from a userspace buffer into a kernelspace buffer), it will
ask the producer to resumeProducing().
For L{IPullProducer} providers, C{resumeProducing} will be called once
each time data is required.
For L{IPushProducer} providers, C{pauseProducing} will be called
whenever the write buffer fills up and C{resumeProducing} will only be
called when it empties.
@type producer: L{IProducer} provider
@param producer: The L{IProducer} that will be producing data.
@type streaming: L{bool}
@param streaming: C{True} if C{producer} provides L{IPushProducer},
C{False} if C{producer} provides L{IPullProducer}.
@raise RuntimeError: If a producer is already registered.
@return: L{None}
"""
if self._requestProducer is not None:
raise RuntimeError(
"Cannot register producer %s, because producer %s was never "
"unregistered." % (producer, self._requestProducer))
if not streaming:
producer = _PullToPush(producer, self)
self._requestProducer = producer
self._requestProducerStreaming = streaming
if not streaming:
producer.startStreaming()
def __init__(self, proto, executable, errFlag):
"""
@param proto: An L{IProtocol} provider.
@param errFlag: A constant belonging to L{StandardErrorBehavior}
that determines if stderr is logged or dropped.
@param executable: The file name (full path) to spawn.
"""
self.protocol = proto
self.errFlag = errFlag
self.executable = executable
def makeConnection(self, process):
"""
Call L{IProtocol} provider's makeConnection method with an
L{ITransport} provider.
@param process: An L{IProcessTransport} provider.
"""
self.transport = _ProcessEndpointTransport(process)
return self.protocol.makeConnection(self.transport)
def processEnded(self, reason):
"""
If the process ends with L{error.ProcessDone}, this method calls the
L{IProtocol} provider's L{connectionLost} with a
L{error.ConnectionDone}
@see: L{ProcessProtocol.processEnded}
"""
if (reason.check(error.ProcessDone) == error.ProcessDone) and (
reason.value.status == 0):
return self.protocol.connectionLost(
Failure(error.ConnectionDone()))
else:
return self.protocol.connectionLost(reason)
def __init__(self, reactor, port, backlog=50, interface='::'):
"""
@param reactor: An L{IReactorTCP} provider.
@param port: The port number used for listening
@type port: int
@param backlog: Size of the listen queue
@type backlog: int
@param interface: The hostname to bind to, defaults to C{::} (all)
@type interface: str
"""
_TCPServerEndpoint.__init__(self, reactor, port, backlog, interface)
def __init__(self, reactor, host, port, timeout=30, bindAddress=None,
attemptDelay=None):
"""
Create a L{HostnameEndpoint}.
@param reactor: The reactor to use for connections and delayed calls.
@type reactor: provider of L{IReactorTCP}, L{IReactorTime} and either
L{IReactorPluggableNameResolver} or L{IReactorPluggableResolver}.
@param host: A hostname to connect to.
@type host: L{bytes} or L{unicode}
@param port: The port number to connect to.
@type port: L{int}
@param timeout: For each individual connection attempt, the number of
seconds to wait before assuming the connection has failed.
@type timeout: L{int}
@param bindAddress: the local address of the network interface to make
the connections from.
@type bindAddress: L{bytes}
@param attemptDelay: The number of seconds to delay between connection
attempts.
@type attemptDelay: L{float}
@see: L{twisted.internet.interfaces.IReactorTCP.connectTCP}
"""
self._reactor = reactor
[self._badHostname, self._hostBytes, self._hostText] = (
self._hostAsBytesAndText(host)
)
self._hostStr = self._hostBytes if bytes is str else self._hostText
self._port = port
self._timeout = timeout
self._bindAddress = bindAddress
if attemptDelay is None:
attemptDelay = self._DEFAULT_ATTEMPT_DELAY
self._attemptDelay = attemptDelay
def __init__(self, reactor, host, port, sslContextFactory,
timeout=30, bindAddress=None):
"""
@param reactor: An L{IReactorSSL} provider.
@param host: A hostname, used when connecting
@type host: str
@param port: The port number, used when connecting
@type port: int
@param sslContextFactory: SSL Configuration information as an instance
of L{interfaces.IOpenSSLContextFactory}.
@param timeout: Number of seconds to wait before assuming the
connection has failed.
@type timeout: int
@param bindAddress: A (host, port) tuple of local address to bind to,
or None.
@type bindAddress: tuple
"""
self._reactor = reactor
self._host = host
self._port = port
self._sslContextFactory = sslContextFactory
self._timeout = timeout
self._bindAddress = bindAddress
def __init__(self, reactor, fileno, addressFamily):
"""
@param reactor: An L{IReactorSocket} provider.
@param fileno: An integer file descriptor corresponding to a listening
I{SOCK_STREAM} socket.
@param addressFamily: The address family of the socket given by
C{fileno}.
"""
self.reactor = reactor
self.fileno = fileno
self.addressFamily = addressFamily
self._used = False
def _parseServer(self, reactor, domain, index):
"""
Internal parser function for L{_parseServer} to convert the string
arguments for a systemd server endpoint into structured arguments for
L{AdoptedStreamServerEndpoint}.
@param reactor: An L{IReactorSocket} provider.
@param domain: The domain (or address family) of the socket inherited
from systemd. This is a string like C{"INET"} or C{"UNIX"}, ie the
name of an address family from the L{socket} module, without the
C{"AF_"} prefix.
@type domain: C{str}
@param index: An offset into the list of file descriptors inherited from
systemd.
@type index: C{str}
@return: A two-tuple of parsed positional arguments and parsed keyword
arguments (a tuple and a dictionary). These can be used to
construct an L{AdoptedStreamServerEndpoint}.
"""
index = int(index)
fileno = self._sddaemon.inheritedDescriptors()[index]
addressFamily = getattr(socket, 'AF_' + domain)
return AdoptedStreamServerEndpoint(reactor, fileno, addressFamily)
def _loadCAsFromDir(directoryPath):
"""
Load certificate-authority certificate objects in a given directory.
@param directoryPath: a L{unicode} or L{bytes} pointing at a directory to
load .pem files from, or L{None}.
@return: an L{IOpenSSLTrustRoot} provider.
"""
caCerts = {}
for child in directoryPath.children():
if not child.asTextMode().basename().split(u'.')[-1].lower() == u'pem':
continue
try:
data = child.getContent()
except IOError:
# Permission denied, corrupt disk, we don't care.
continue
try:
theCert = Certificate.loadPEM(data)
except SSLError:
# Duplicate certificate, invalid certificate, etc. We don't care.
pass
else:
caCerts[theCert.digest()] = theCert
return trustRootFromCertificates(caCerts.values())
def test_constructor(self):
"""
Stores an L{IProtocol} provider and the flag to log/drop stderr
"""
d = self.ep.connect(self.factory)
self.successResultOf(d)
wpp = self.reactor.processProtocol
self.assertIsInstance(wpp.protocol, StubApplicationProtocol)
self.assertEqual(wpp.errFlag, self.ep._errFlag)
def gen_retries(start, end, intervals, clock=reactor):
"""Helper for retrying something, sleeping between attempts.
Yields ``(elapsed, remaining, wait)`` tuples, giving times in seconds. The
last item, `wait`, is the suggested amount of time to sleep before trying
again.
This function works in concert with `retries`. It's split out so that
`retries` can capture the correct start time rather than the time at which
it is first iterated.
:param start: The start time, in seconds, of this generator. This must be
congruent with the `IReactorTime` argument passed to this generator.
:param end: The desired end time, in seconds, of this generator. This must
be congruent with the `IReactorTime` argument passed to this
generator.
:param intervals: A iterable of intervals, each in seconds, which should
be used as hints for the `wait` value that's generated.
:param clock: An optional `IReactorTime` provider. Defaults to the
installed reactor.
"""
for interval in intervals:
now = clock.seconds()
if now < end:
wait = min(interval, end - now)
yield now - start, end - now, wait
else:
yield now - start, end - now, 0
break
def test_register_behavior(self):
cur_count = len(
configure.get_configurations('plone.server.tests', 'behavior'))
from plone.server.interfaces import IFormFieldProvider
from zope.interface import provider
from zope import schema
@provider(IFormFieldProvider)
class IMyBehavior(Interface):
foobar = schema.Text()
configure.behavior(
title="MyBehavior",
provides=IMyBehavior,
factory="plone.behavior.AnnotationStorage",
for_="plone.server.interfaces.IResource"
)()
self.assertEqual(
len(configure.get_configurations('plone.server.tests', 'behavior')),
cur_count + 1)
class IMyType(Interface):
pass
class MyType(Item):
pass
configure.register_configuration(MyType, dict(
context=ISite,
schema=IMyType,
portal_type="MyType2",
behaviors=[IMyBehavior]
), 'contenttype')
# now test it...
configure.load_configuration(
self.layer.app.app.config, 'plone.server.tests', 'contenttype')
self.layer.app.app.config.execute_actions()
resp = self.layer.requester('GET', '/plone/plone/@types')
response = json.loads(resp.text)
type_ = [s for s in response if s['title'] == 'MyType2'][0]
self.assertTrue('foobar' in type_['definitions']['IMyBehavior']['properties'])
def deterministicResolvingReactor(reactor, expectedAddresses=(),
hostMap=None):
"""
Create a reactor that will deterministically resolve all hostnames it is
passed to the list of addresses given.
@param reactor: An object that we wish to add an
L{IReactorPluggableNameResolver} to.
@type reactor: Any object with some formally-declared interfaces (i.e. one
where C{list(providedBy(reactor))} is not empty); usually C{IReactor*}
interfaces.
@param expectedAddresses: (optional); the addresses expected to be returned
for every address. If these are strings, they should be IPv4 or IPv6
literals, and they will be wrapped in L{IPv4Address} and L{IPv6Address}
objects in the resolution result.
@type expectedAddresses: iterable of C{object} or C{str}
@param hostMap: (optional); the names (unicode) mapped to lists of
addresses (str or L{IAddress}); in the same format as expectedAddress,
which map the results for I{specific} hostnames to addresses.
@return: A new reactor which provides all the interfaces previously
provided by C{reactor} as well as L{IReactorPluggableNameResolver}.
All name resolutions performed with its C{nameResolver} attribute will
resolve reentrantly and synchronously with the given
C{expectedAddresses}. However, it is not a complete implementation as
it does not have an C{installNameResolver} method.
"""
if hostMap is None:
hostMap = {}
hostMap = hostMap.copy()
@provider(IHostnameResolver)
class SimpleNameResolver(object):
@staticmethod
def resolveHostName(resolutionReceiver, hostName, portNumber=0,
addressTypes=None, transportSemantics='TCP'):
resolutionReceiver.resolutionBegan(None)
for expectedAddress in hostMap.get(hostName, expectedAddresses):
if isinstance(expectedAddress, str):
expectedAddress = ([IPv4Address, IPv6Address]
[isIPv6Address(expectedAddress)]
('TCP', expectedAddress, portNumber))
resolutionReceiver.addressResolved(expectedAddress)
resolutionReceiver.resolutionComplete()
@provider(IReactorPluggableNameResolver)
class WithResolver(proxyForInterface(
InterfaceClass('*', tuple(providedBy(reactor)))
)):
nameResolver = SimpleNameResolver()
return WithResolver(reactor)