def _get_shortname_from_docs(auth, types_url):
# Get the server version from types.xsd. We can't necessarily use the service auth type since it may not be the
# same as the auth type for docs.
log.debug('Getting %s with auth type %s', types_url, auth.__class__.__name__)
# Some servers send an empty response if we send 'Connection': 'close' header
from .protocol import BaseProtocol
with requests.sessions.Session() as s:
s.mount('http://', adapter=BaseProtocol.get_adapter())
s.mount('https://', adapter=BaseProtocol.get_adapter())
r = s.get(url=types_url, auth=auth, allow_redirects=False, stream=False)
log.debug('Request headers: %s', r.request.headers)
log.debug('Response code: %s', r.status_code)
log.debug('Response headers: %s', r.headers)
if r.status_code != 200:
raise TransportError('Unexpected HTTP status %s when getting %s (%s)' % (r.status_code, types_url, r.text))
if not is_xml(r.text):
raise TransportError('Unexpected result when getting %s. Maybe this is not an EWS server?%s' % (
types_url,
'\n\n%s[...]' % r.text[:200] if len(r.text) > 200 else '\n\n%s' % r.text if r.text else '',
))
return to_xml(r.text).get('version')
python类sessions()的实例源码
def guess(cls, protocol):
"""
Tries to ask the server which version it has. We haven't set up an Account object yet, so we generate requests
by hand. We only need a response header containing a ServerVersionInfo element.
The types.xsd document contains a 'shortname' value that we can use as a key for VERSIONS to get the API version
that we need in SOAP headers to generate valid requests. Unfortunately, the Exchagne server may be misconfigured
to either block access to types.xsd or serve up a wrong version of the document. Therefore, we only use
'shortname' as a hint, but trust the SOAP version returned in response headers.
To get API version and build numbers from the server, we need to send a valid SOAP request. We can't do that
without a valid API version. To solve this chicken-and-egg problem, we try all possible API versions that this
package supports, until we get a valid response. If we managed to get a 'shortname' previously, we try the
corresponding API version first.
"""
log.debug('Asking server for version info')
# We can't use a session object from the protocol pool for docs because sessions are created with service auth.
auth = get_auth_instance(credentials=protocol.credentials, auth_type=protocol.docs_auth_type)
try:
shortname = cls._get_shortname_from_docs(auth=auth, types_url=protocol.types_url)
log.debug('Shortname according to %s: %s', protocol.types_url, shortname)
except (TransportError, ParseError) as e:
log.info(text_type(e))
shortname = None
api_version = VERSIONS[shortname][0] if shortname else None
return cls._guess_version_from_service(protocol=protocol, hint=api_version)
def close(self):
log.debug('Server %s: Closing sessions', self.server)
while True:
try:
self._session_pool.get(block=False).close()
except Empty:
break
def get_session(self):
_timeout = 60 # Rate-limit messages about session starvation
while True:
try:
log.debug('Server %s: Waiting for session', self.server)
session = self._session_pool.get(timeout=_timeout)
log.debug('Server %s: Got session %s', self.server, session.session_id)
return session
except Empty:
# This is normal when we have many worker threads starving for available sessions
log.debug('Server %s: No sessions available for %s seconds', self.server, _timeout)
def release_session(self, session):
# This should never fail, as we don't have more sessions than the queue contains
log.debug('Server %s: Releasing session %s', self.server, session.session_id)
try:
self._session_pool.put(session, block=False)
except Full:
log.debug('Server %s: Session pool was already full %s', self.server, session.session_id)
def __init__(self, *args, **kwargs):
version = kwargs.pop('version', None)
super(Protocol, self).__init__(*args, **kwargs)
scheme = 'https' if self.has_ssl else 'http'
self.wsdl_url = '%s://%s/EWS/Services.wsdl' % (scheme, self.server)
self.messages_url = '%s://%s/EWS/messages.xsd' % (scheme, self.server)
self.types_url = '%s://%s/EWS/types.xsd' % (scheme, self.server)
# Autodetect authentication type if necessary
if self.auth_type is None:
self.auth_type = get_service_authtype(service_endpoint=self.service_endpoint, versions=API_VERSIONS,
name=self.credentials.username)
# Default to the auth type used by the service. We only need this if 'version' is None
self.docs_auth_type = self.auth_type
# Try to behave nicely with the Exchange server. We want to keep the connection open between requests.
# We also want to re-use sessions, to avoid the NTLM auth handshake on every request.
self._session_pool = LifoQueue(maxsize=self.SESSION_POOLSIZE)
for _ in range(self.SESSION_POOLSIZE):
self._session_pool.put(self.create_session(), block=False)
if version:
isinstance(version, Version)
self.version = version
else:
# Version.guess() needs auth objects and a working session pool
try:
# Try to get the auth_type of 'types.xsd' so we can fetch it and look at the version contained there
self.docs_auth_type = get_docs_authtype(docs_url=self.types_url)
except TransportError:
pass
self.version = Version.guess(self)
# Used by services to process service requests that are able to run in parallel. Thread pool should be
# larger than the connection pool so we have time to process data without idling the connection.
# Create the pool as the last thing here, since we may fail in the version or auth type guessing, which would
# leave open threads around to be garbage collected.
thread_poolsize = 4 * self.SESSION_POOLSIZE
self.thread_pool = ThreadPool(processes=thread_poolsize)
def test_basicauth_with_netrc(self, httpbin):
auth = ('user', 'pass')
wrong_auth = ('wronguser', 'wrongpass')
url = httpbin('basic-auth', 'user', 'pass')
old_auth = requests.sessions.get_netrc_auth
try:
def get_netrc_auth_mock(url):
return auth
requests.sessions.get_netrc_auth = get_netrc_auth_mock
# Should use netrc and work.
r = requests.get(url)
assert r.status_code == 200
# Given auth should override and fail.
r = requests.get(url, auth=wrong_auth)
assert r.status_code == 401
s = requests.session()
# Should use netrc and work.
r = s.get(url)
assert r.status_code == 200
# Given auth should override and fail.
s.auth = wrong_auth
r = s.get(url)
assert r.status_code == 401
finally:
requests.sessions.get_netrc_auth = old_auth
def test_basicauth_with_netrc(self, httpbin):
auth = ('user', 'pass')
wrong_auth = ('wronguser', 'wrongpass')
url = httpbin('basic-auth', 'user', 'pass')
old_auth = requests.sessions.get_netrc_auth
try:
def get_netrc_auth_mock(url):
return auth
requests.sessions.get_netrc_auth = get_netrc_auth_mock
# Should use netrc and work.
r = requests.get(url)
assert r.status_code == 200
# Given auth should override and fail.
r = requests.get(url, auth=wrong_auth)
assert r.status_code == 401
s = requests.session()
# Should use netrc and work.
r = s.get(url)
assert r.status_code == 200
# Given auth should override and fail.
s.auth = wrong_auth
r = s.get(url)
assert r.status_code == 401
finally:
requests.sessions.get_netrc_auth = old_auth
def test_basicauth_with_netrc(self, httpbin):
auth = ('user', 'pass')
wrong_auth = ('wronguser', 'wrongpass')
url = httpbin('basic-auth', 'user', 'pass')
old_auth = requests.sessions.get_netrc_auth
try:
def get_netrc_auth_mock(url):
return auth
requests.sessions.get_netrc_auth = get_netrc_auth_mock
# Should use netrc and work.
r = requests.get(url)
assert r.status_code == 200
# Given auth should override and fail.
r = requests.get(url, auth=wrong_auth)
assert r.status_code == 401
s = requests.session()
# Should use netrc and work.
r = s.get(url)
assert r.status_code == 200
# Given auth should override and fail.
s.auth = wrong_auth
r = s.get(url)
assert r.status_code == 401
finally:
requests.sessions.get_netrc_auth = old_auth
def test_basicauth_with_netrc(self, httpbin):
auth = ('user', 'pass')
wrong_auth = ('wronguser', 'wrongpass')
url = httpbin('basic-auth', 'user', 'pass')
old_auth = requests.sessions.get_netrc_auth
try:
def get_netrc_auth_mock(url):
return auth
requests.sessions.get_netrc_auth = get_netrc_auth_mock
# Should use netrc and work.
r = requests.get(url)
assert r.status_code == 200
# Given auth should override and fail.
r = requests.get(url, auth=wrong_auth)
assert r.status_code == 401
s = requests.session()
# Should use netrc and work.
r = s.get(url)
assert r.status_code == 200
# Given auth should override and fail.
s.auth = wrong_auth
r = s.get(url)
assert r.status_code == 401
finally:
requests.sessions.get_netrc_auth = old_auth