def unique_everseen(iterable, key=None):
"List unique elements, preserving order. Remember all elements ever seen."
# unique_everseen('AAAABBBCCDAABBB') --> A B C D
# unique_everseen('ABBCcAD', str.lower) --> A B C D
seen = set()
seen_add = seen.add
if key is None:
for element in six.moves.filterfalse(seen.__contains__, iterable):
seen_add(element)
yield element
else:
for element in iterable:
k = key(element)
if k not in seen:
seen_add(k)
yield element
python类moves()的实例源码
def unique_everseen(iterable, key=None):
"List unique elements, preserving order. Remember all elements ever seen."
# unique_everseen('AAAABBBCCDAABBB') --> A B C D
# unique_everseen('ABBCcAD', str.lower) --> A B C D
seen = set()
seen_add = seen.add
if key is None:
for element in six.moves.filterfalse(seen.__contains__, iterable):
seen_add(element)
yield element
else:
for element in iterable:
k = key(element)
if k not in seen:
seen_add(k)
yield element
def test_move_items(item_name):
"""Ensure that everything loads correctly."""
try:
item = getattr(six.moves, item_name)
if isinstance(item, types.ModuleType):
__import__("six.moves." + item_name)
except AttributeError:
if item_name == "zip_longest" and sys.version_info < (2, 6):
py.test.skip("zip_longest only available on 2.6+")
except ImportError:
if item_name == "winreg" and not sys.platform.startswith("win"):
py.test.skip("Windows only module")
if item_name.startswith("tkinter"):
if not have_tkinter:
py.test.skip("requires tkinter")
if item_name == "tkinter_ttk" and sys.version_info[:2] <= (2, 6):
py.test.skip("ttk only available on 2.7+")
if item_name.startswith("dbm_gnu") and not have_gdbm:
py.test.skip("requires gdbm")
raise
if sys.version_info[:2] >= (2, 6):
assert item_name in dir(six.moves)
def test_python_2_unicode_compatible():
@six.python_2_unicode_compatible
class MyTest(object):
def __str__(self):
return six.u('hello')
def __bytes__(self):
return six.b('hello')
my_test = MyTest()
if six.PY2:
assert str(my_test) == six.b("hello")
assert unicode(my_test) == six.u("hello")
elif six.PY3:
assert bytes(my_test) == six.b("hello")
assert str(my_test) == six.u("hello")
assert getattr(six.moves.builtins, 'bytes', str)(my_test) == six.b("hello")
def unique_everseen(iterable, key=None):
"List unique elements, preserving order. Remember all elements ever seen."
# unique_everseen('AAAABBBCCDAABBB') --> A B C D
# unique_everseen('ABBCcAD', str.lower) --> A B C D
seen = set()
seen_add = seen.add
if key is None:
for element in six.moves.filterfalse(seen.__contains__, iterable):
seen_add(element)
yield element
else:
for element in iterable:
k = key(element)
if k not in seen:
seen_add(k)
yield element
def __init__(self, service, config, signer, type_mapping):
validate_config(config)
self.signer = signer
self.endpoint = regions.endpoint_for(
service,
region=config.get("region"),
endpoint=config.get("endpoint"))
self.complex_type_mappings = type_mapping
self.type_mappings = merge_type_mappings(self.primitive_type_map, type_mapping)
self.session = requests.Session()
self.user_agent = build_user_agent(get_config_value_or_default(config, "additional_user_agent"))
self.logger = logging.getLogger("{}.{}".format(__name__, id(self)))
self.logger.addHandler(logging.NullHandler())
if get_config_value_or_default(config, "log_requests"):
self.logger.setLevel(logging.DEBUG)
six.moves.http_client.HTTPConnection.debuglevel = 1
else:
six.moves.http_client.HTTPConnection.debuglevel = 0
def run(self):
while (not self.kill) and (self.running or not self.pool.queue.empty()):
try:
item = self.pool.queue.get(timeout=self.get_timeout)
except six.moves.queue.Empty:
continue
self.pool.queue_get_lock.acquire()
self.pool.queue_processed += 1
num = self.pool.queue_processed
self.pool.queue_get_lock.release()
try:
self.process(item, num)
except:
self.failed = True
self.pool.exceptions.append(sys.exc_info())
self.pool.kill()
def unique_everseen(iterable, key=None):
"List unique elements, preserving order. Remember all elements ever seen."
# unique_everseen('AAAABBBCCDAABBB') --> A B C D
# unique_everseen('ABBCcAD', str.lower) --> A B C D
seen = set()
seen_add = seen.add
if key is None:
for element in six.moves.filterfalse(seen.__contains__, iterable):
seen_add(element)
yield element
else:
for element in iterable:
k = key(element)
if k not in seen:
seen_add(k)
yield element
def unique_everseen(iterable, key=None):
"List unique elements, preserving order. Remember all elements ever seen."
# unique_everseen('AAAABBBCCDAABBB') --> A B C D
# unique_everseen('ABBCcAD', str.lower) --> A B C D
seen = set()
seen_add = seen.add
if key is None:
for element in six.moves.filterfalse(seen.__contains__, iterable):
seen_add(element)
yield element
else:
for element in iterable:
k = key(element)
if k not in seen:
seen_add(k)
yield element
def test_key_loaded(self):
with open(self.key_file, 'wb') as f:
f.write(base64.b64encode(os.urandom(16)))
with open(self.key_file) as f:
with mock.patch.object(six.moves.builtins, 'open') as mock_open:
mock_open.return_value = f
local.encrypt(b'abc')
mock_open.assert_called_with(self.key_file, 'rb')
def _detect_gce_environment():
"""Determine if the current environment is Compute Engine.
Returns:
Boolean indicating whether or not the current environment is Google
Compute Engine.
"""
# NOTE: The explicit ``timeout`` is a workaround. The underlying
# issue is that resolving an unknown host on some networks will take
# 20-30 seconds; making this timeout short fixes the issue, but
# could lead to false negatives in the event that we are on GCE, but
# the metadata resolution was particularly slow. The latter case is
# "unlikely".
connection = six.moves.http_client.HTTPConnection(
_GCE_METADATA_HOST, timeout=1)
try:
headers = {_METADATA_FLAVOR_HEADER: _DESIRED_METADATA_FLAVOR}
connection.request('GET', '/', headers=headers)
response = connection.getresponse()
if response.status == http_client.OK:
return (response.getheader(_METADATA_FLAVOR_HEADER) ==
_DESIRED_METADATA_FLAVOR)
except socket.error: # socket.timeout or socket.error(64, 'Host is down')
logger.info('Timeout attempting to reach GCE metadata service.')
return False
finally:
connection.close()
def _should_write_file(self, filepath):
# type: (str) -> bool
"""Determines whether a specific file should be written.
:param str filepath: Full file path to file in question
:rtype: bool
"""
if not os.path.isfile(filepath):
# The file does not exist, nothing to overwrite
return True
if self.no_overwrite:
# The file exists and the caller specifically asked us not to overwrite anything
_LOGGER.warning('Skipping existing output file because of "no overwrite" option: %s', filepath)
return False
if self.interactive:
# The file exists and the caller asked us to be consulted on action before overwriting
decision = six.moves.input( # type: ignore # six.moves confuses mypy
'Overwrite existing output file "{}" with new contents? [y/N]:'.format(filepath)
)
try:
if decision.lower()[0] == 'y':
_LOGGER.warning('Overwriting existing output file based on interactive user decision: %s', filepath)
return True
return False
except IndexError:
# No input is interpreted as 'do not overwrite'
_LOGGER.warning('Skipping existing output file based on interactive user decision: %s', filepath)
return False
# If we get to this point, the file exists and we should overwrite it
_LOGGER.warning('Overwriting existing output file because no action was specified otherwise: %s', filepath)
return True
def _should_retry_response(resp_status, content):
"""Determines whether a response should be retried.
Args:
resp_status: The response status received.
content: The response content body.
Returns:
True if the response should be retried, otherwise False.
"""
# Retry on 5xx errors.
if resp_status >= 500:
return True
# Retry on 429 errors.
if resp_status == _TOO_MANY_REQUESTS:
return True
# For 403 errors, we have to check for the `reason` in the response to
# determine if we should retry.
if resp_status == six.moves.http_client.FORBIDDEN:
# If there's no details about the 403 type, don't retry.
if not content:
return False
# Content is in JSON format.
try:
data = json.loads(content.decode('utf-8'))
reason = data['error']['errors'][0]['reason']
except (UnicodeDecodeError, ValueError, KeyError):
LOGGER.warning('Invalid JSON content from response: %s', content)
return False
LOGGER.warning('Encountered 403 Forbidden with reason "%s"', reason)
# Only retry on rate limit related failures.
if reason in ('userRateLimitExceeded', 'rateLimitExceeded', ):
return True
# Everything else is a success or non-retriable so break.
return False
def test_valid_answer_github(self):
with mock.patch.object(six.moves, 'input') as input_:
input_.return_value = 'GitHub'
result = _configure_publish()
assert result == 'github'
def test_invalid_answer(self):
with mock.patch.object(six.moves, 'input') as input_:
# Return a wrong answer the first time, and raise an exception
# the second time (using this to verify the full recursive loop).
input_.side_effect = ('bogus', RuntimeError)
with pytest.raises(RuntimeError):
_configure_publish()
def _get_operation_code(op):
code = ''.join('v%d_%d = v%d;\n' % (op.num, i, v.num)
for i, v in enumerate(op.in_vars))
params = ['v%d_%d' % (op.num, i)
for i in six.moves.range(op.nin + op.nout)]
code += op.name + '(' + ', '.join(params) + ');\n'
code += ''.join('v%d = v%d_%d;\n' %
(v.num, op.num, i + op.nin)
for i, v in enumerate(op.out_vars))
return code
def test_multiple(self):
for _ in six.moves.range(10):
f, name = self._call("wow")
self.assertTrue(isinstance(f, file_type))
self.assertTrue(isinstance(name, str))
self.assertTrue("wow-0009.conf" in name)
def test_plugins(self):
flags = ['--init', '--prepare', '--authenticators', '--installers']
for args in itertools.chain(
*(itertools.combinations(flags, r)
for r in six.moves.range(len(flags)))):
self._call(['plugins'] + list(args))
def get_all(self, argument_name, default_value=None):
"""Returns a list of query or POST arguments with the given name.
We parse the query string and POST payload lazily, so this will be a
slower operation on the first call.
:param argument_name:
The name of the query or POST argument.
:param default_value:
The value to return if the given argument is not present,
None may not be used as a default, if it is then an empty
list will be returned instead.
:returns:
A (possibly empty) list of values.
"""
if self.charset and not six.PY3:
argument_name = argument_name.encode(self.charset)
if default_value is None:
default_value = []
param_value = self.params.getall(argument_name)
if param_value is None or len(param_value) == 0:
return default_value
for i in six.moves.range(len(param_value)):
if isinstance(param_value[i], cgi.FieldStorage):
param_value[i] = param_value[i].value
return param_value
def log_critical(self):
"Translate critical-level log messages."
return self._make_log_translation_func('critical')
# NOTE(dhellmann): When this module moves out of the incubator into
# oslo.i18n, these global variables can be moved to an integration
# module within each application.
# Create the global translation functions.