def __init__(
self, index_url="https://pypi.python.org/simple", hosts=('*',),
ca_bundle=None, verify_ssl=True, *args, **kw
):
Environment.__init__(self, *args, **kw)
self.index_url = index_url + "/" [:not index_url.endswith('/')]
self.scanned_urls = {}
self.fetched_urls = {}
self.package_pages = {}
self.allows = re.compile('|'.join(map(translate, hosts))).match
self.to_scan = []
use_ssl = (
verify_ssl
and ssl_support.is_available
and (ca_bundle or ssl_support.find_ca_bundle())
)
if use_ssl:
self.opener = ssl_support.opener_for(ca_bundle)
else:
self.opener = urllib.request.urlopen
python类match()的实例源码
def load_setting_sync_delay(logger, config_settings):
"""
Attempt to parse delay between sync loops from config settings
:param logger: the logger
:param config_settings: config settings loaded from config file
:return: extracted sync delay if valid, else DEFAULT_SYNC_DELAY_IN_SECONDS
"""
try:
sync_delay = config_settings['export_options']['sync_delay_in_seconds']
sync_delay_is_valid = re.match('^[0-9]+$', str(sync_delay))
if sync_delay_is_valid and sync_delay >= 0:
if sync_delay < DEFAULT_SYNC_DELAY_IN_SECONDS:
'{0} seconds'.format(logger.info(
'Sync delay is less than the minimum recommended value of ' + str(DEFAULT_SYNC_DELAY_IN_SECONDS)))
return sync_delay
else:
logger.info('Invalid sync_delay_in_seconds from the configuration file, defaulting to {0}'.format(str(
DEFAULT_SYNC_DELAY_IN_SECONDS)))
return DEFAULT_SYNC_DELAY_IN_SECONDS
except Exception as ex:
log_critical_error(logger, ex,
'Exception parsing sync_delay from the configuration file, defaulting to {0}'.format(str(
DEFAULT_SYNC_DELAY_IN_SECONDS)))
return DEFAULT_SYNC_DELAY_IN_SECONDS
def load_setting_api_access_token(logger, config_settings):
"""
Attempt to parse API token from config settings
:param logger: the logger
:param config_settings: config settings loaded from config file
:return: API token if valid, else None
"""
try:
api_token = config_settings['API']['token']
token_is_valid = re.match('^[a-f0-9]{64}$', api_token)
if token_is_valid:
logger.debug('API token matched expected pattern')
return api_token
else:
logger.error('API token failed to match expected pattern')
return None
except Exception as ex:
log_critical_error(logger, ex, 'Exception parsing API token from config.yaml')
return None
def load_setting_input_filename(logger, config_settings):
"""
Attempt to parse input filename from config settings
:param logger: the logger
:param config_settings: config settings loaded from config file
:return: input filename from config file if valid, else None
"""
try:
filename = config_settings['input_filename']
filename_is_valid = re.match('.+xls|.+xlsx', filename)
if filename_is_valid:
logger.debug('Filename matched expected pattern')
return filename
else:
logger.error('Filename failed to match expected pattern, acceptable formats are xls and xlsx')
return None
except Exception as ex:
log_critical_error(logger, ex, 'Exception parsing input filename from config.yaml')
def _sort_names(names):
'''
Sort peeker names by index and alphabetically.
For example, the peeker names would be sorted as a[0], b[0], a[1], b[1], ...
'''
def index_key(lbl):
'''Index sorting.'''
m = re.match('.*\[(\d+)\]$', lbl) # Get the bracketed index.
if m:
return int(m.group(1)) # Return the index as an integer.
return -1 # No index found so it comes before everything else.
def name_key(lbl):
'''Name sorting.'''
m = re.match('^([^\[]+)', lbl) # Get name preceding bracketed index.
if m:
return m.group(1) # Return name.
return '' # No name found.
srt_names = sorted(names, key=name_key)
srt_names = sorted(srt_names, key=index_key)
return srt_names
def split_filename(filename, project_name=None):
"""
Extract name, version, python version from a filename (no extension)
Return name, version, pyver or None
"""
result = None
pyver = None
filename = unquote(filename).replace(' ', '-')
m = PYTHON_VERSION.search(filename)
if m:
pyver = m.group(1)
filename = filename[:m.start()]
if project_name and len(filename) > len(project_name) + 1:
m = re.match(re.escape(project_name) + r'\b', filename)
if m:
n = m.end()
result = filename[:n], filename[n + 1:], pyver
if result is None:
m = PROJECT_NAME_AND_VERSION.match(filename)
if m:
result = m.group(1), m.group(3), pyver
return result
# Allow spaces in name because of legacy dists like "Twisted Core"
def __init__(self, filename):
"""
:raises InvalidWheelFilename: when the filename is invalid for a wheel
"""
wheel_info = self.wheel_file_re.match(filename)
if not wheel_info:
raise InvalidWheelFilename(
"%s is not a valid wheel filename." % filename
)
self.filename = filename
self.name = wheel_info.group('name').replace('_', '-')
# we'll assume "_" means "-" due to wheel naming scheme
# (https://github.com/pypa/pip/issues/1150)
self.version = wheel_info.group('ver').replace('_', '-')
self.pyversions = wheel_info.group('pyver').split('.')
self.abis = wheel_info.group('abi').split('.')
self.plats = wheel_info.group('plat').split('.')
# All the tag combinations from this file
self.file_tags = set(
(x, y, z) for x in self.pyversions
for y in self.abis for z in self.plats
)
def _needs_hiding(mod_name):
"""
>>> _needs_hiding('setuptools')
True
>>> _needs_hiding('pkg_resources')
True
>>> _needs_hiding('setuptools_plugin')
False
>>> _needs_hiding('setuptools.__init__')
True
>>> _needs_hiding('distutils')
True
>>> _needs_hiding('os')
False
>>> _needs_hiding('Cython')
True
"""
pattern = re.compile(r'(setuptools|pkg_resources|distutils|Cython)(\.|$)')
return bool(pattern.match(mod_name))
def __init__(
self, index_url="https://pypi.python.org/simple", hosts=('*',),
ca_bundle=None, verify_ssl=True, *args, **kw
):
Environment.__init__(self, *args, **kw)
self.index_url = index_url + "/" [:not index_url.endswith('/')]
self.scanned_urls = {}
self.fetched_urls = {}
self.package_pages = {}
self.allows = re.compile('|'.join(map(translate, hosts))).match
self.to_scan = []
use_ssl = (
verify_ssl
and ssl_support.is_available
and (ca_bundle or ssl_support.find_ca_bundle())
)
if use_ssl:
self.opener = ssl_support.opener_for(ca_bundle)
else:
self.opener = urllib.request.urlopen
def filterwarnings(action, message="", category=Warning, module="", lineno=0,
append=False):
"""Insert an entry into the list of warnings filters (at the front).
'action' -- one of "error", "ignore", "always", "default", "module",
or "once"
'message' -- a regex that the warning message must match
'category' -- a class that the warning must be a subclass of
'module' -- a regex that the module name must match
'lineno' -- an integer line number, 0 matches all warnings
'append' -- if true, append to the list of filters
"""
import re
assert action in ("error", "ignore", "always", "default", "module",
"once"), "invalid action: %r" % (action,)
assert isinstance(message, str), "message must be a string"
assert isinstance(category, type), "category must be a class"
assert issubclass(category, Warning), "category must be a Warning subclass"
assert isinstance(module, str), "module must be a string"
assert isinstance(lineno, int) and lineno >= 0, \
"lineno must be an int >= 0"
_add_filter(action, re.compile(message, re.I), category,
re.compile(module), lineno, append=append)
def _getcategory(category):
import re
if not category:
return Warning
if re.match("^[a-zA-Z0-9_]+$", category):
try:
cat = eval(category)
except NameError:
raise _OptionError("unknown warning category: %r" % (category,))
else:
i = category.rfind(".")
module = category[:i]
klass = category[i+1:]
try:
m = __import__(module, None, None, [klass])
except ImportError:
raise _OptionError("invalid module name: %r" % (module,))
try:
cat = getattr(m, klass)
except AttributeError:
raise _OptionError("unknown warning category: %r" % (category,))
if not issubclass(cat, Warning):
raise _OptionError("invalid warning category: %r" % (category,))
return cat
def __exit__(self, *exc_info):
if not self._entered:
raise RuntimeError("Cannot exit %r without entering first" % self)
self._module.filters = self._filters
self._module._filters_mutated()
self._module.showwarning = self._showwarning
self._module._showwarnmsg_impl = self._showwarnmsg_impl
# filters contains a sequence of filter 5-tuples
# The components of the 5-tuple are:
# - an action: error, ignore, always, default, module, or once
# - a compiled regex that must match the warning message
# - a class representing the warning category
# - a compiled regex that must match the module that is being warned
# - a line number for the line being warning, or 0 to mean any line
# If either if the compiled regexs are None, match anything.
def b64decode(s, altchars=None, validate=False):
"""Decode the Base64 encoded bytes-like object or ASCII string s.
Optional altchars must be a bytes-like object or ASCII string of length 2
which specifies the alternative alphabet used instead of the '+' and '/'
characters.
The result is returned as a bytes object. A binascii.Error is raised if
s is incorrectly padded.
If validate is False (the default), characters that are neither in the
normal base-64 alphabet nor the alternative alphabet are discarded prior
to the padding check. If validate is True, these non-alphabet characters
in the input result in a binascii.Error.
"""
s = _bytes_from_decode_data(s)
if altchars is not None:
altchars = _bytes_from_decode_data(altchars)
assert len(altchars) == 2, repr(altchars)
s = s.translate(bytes.maketrans(altchars, b'+/'))
if validate and not re.match(b'^[A-Za-z0-9+/]*={0,2}$', s):
raise binascii.Error('Non-base64 digit found')
return binascii.a2b_base64(s)
def get_model_filenames(model_dir):
files = os.listdir(model_dir)
meta_files = [s for s in files if s.endswith('.meta')]
if len(meta_files)==0:
raise ValueError('No meta file found in the model directory (%s)' % model_dir)
elif len(meta_files)>1:
raise ValueError('There should not be more than one meta file in the model directory (%s)' % model_dir)
meta_file = meta_files[0]
meta_files = [s for s in files if '.ckpt' in s]
max_step = -1
for f in files:
step_str = re.match(r'(^model-[\w\- ]+.ckpt-(\d+))', f)
if step_str is not None and len(step_str.groups())>=2:
step = int(step_str.groups()[1])
if step > max_step:
max_step = step
ckpt_file = step_str.groups()[0]
return meta_file, ckpt_file
def sms():
if len(sys.argv) < 2:
print(help_text)
return
if sys.argv[1] == "send":
if len(sys.argv) < 3:
print("????? ????? ?? ?? ???.")
return
if not re.match(r"[\+98|0]9[0-9]*",sys.argv[2]):
print("????? ???? ??? ?????? ???.tetete")
return
number = sys.argv[2]
if re.match(sys.argv[2], r"^\+98"):
number = re.sub("+98", "0", number)
text = sys.argv[3]
if len(text) > 100:
print("????? ??????? ??? ??? ????? ???? ???.")
return
send_sms(number, text, str(time.time()))
return
if sys.argv[1] == "credits":
get_credits()
return
def _ungroup_go_imports(fname):
with open(fname, 'r+') as f:
content = f.readlines()
out = []
import_block = False
for line in content:
c = line.strip()
if import_block:
if c == '':
continue
elif re.match(END_IMPORT_REGEX, c) is not None:
import_block = False
elif re.match(BEGIN_IMPORT_REGEX, c) is not None:
import_block = True
out.append(line)
f.seek(0)
f.writelines(out)
f.truncate()
def check_header_validity(header):
"""Verifies that header value is a string which doesn't contain
leading whitespace or return characters. This prevents unintended
header injection.
:param header: tuple, in the format (name, value).
"""
name, value = header
if isinstance(value, bytes):
pat = _CLEAN_HEADER_REGEX_BYTE
else:
pat = _CLEAN_HEADER_REGEX_STR
try:
if not pat.match(value):
raise InvalidHeader("Invalid return character or leading space in header: %s" % name)
except TypeError:
raise InvalidHeader("Value for header {%s: %s} must be of type str or "
"bytes, not %s" % (name, value, type(value)))
def _needs_hiding(mod_name):
"""
>>> _needs_hiding('setuptools')
True
>>> _needs_hiding('pkg_resources')
True
>>> _needs_hiding('setuptools_plugin')
False
>>> _needs_hiding('setuptools.__init__')
True
>>> _needs_hiding('distutils')
True
>>> _needs_hiding('os')
False
>>> _needs_hiding('Cython')
True
"""
pattern = re.compile(r'(setuptools|pkg_resources|distutils|Cython)(\.|$)')
return bool(pattern.match(mod_name))
def zap_pyfiles(self):
log.info("Removing .py files from temporary directory")
for base, dirs, files in walk_egg(self.bdist_dir):
for name in files:
path = os.path.join(base, name)
if name.endswith('.py'):
log.debug("Deleting %s", path)
os.unlink(path)
if base.endswith('__pycache__'):
path_old = path
pattern = r'(?P<name>.+)\.(?P<magic>[^.]+)\.pyc'
m = re.match(pattern, name)
path_new = os.path.join(base, os.pardir, m.group('name') + '.pyc')
log.info("Renaming file from [%s] to [%s]" % (path_old, path_new))
try:
os.remove(path_new)
except OSError:
pass
os.rename(path_old, path_new)
def find_external_links(url, page):
"""Find rel="homepage" and rel="download" links in `page`, yielding URLs"""
for match in REL.finditer(page):
tag, rel = match.groups()
rels = set(map(str.strip, rel.lower().split(',')))
if 'homepage' in rels or 'download' in rels:
for match in HREF.finditer(tag):
yield urllib.parse.urljoin(url, htmldecode(match.group(1)))
for tag in ("<th>Home Page", "<th>Download URL"):
pos = page.find(tag)
if pos != -1:
match = HREF.search(page, pos)
if match:
yield urllib.parse.urljoin(url, htmldecode(match.group(1)))
def get_all_users(show_teachers=False):
"""
Finds all the users in the database
Args:
show_teachers: whether or not to include teachers in the response
Returns:
Returns the uid, username, and email of all users.
"""
db = api.common.get_conn()
match = {}
projection = {"uid": 1, "username": 1, "email": 1, "tid": 1}
if not show_teachers:
match.update({"teacher": False})
projection.update({"teacher": 1})
return list(db.users.find(match, projection))
def run(self):
view = self.window.active_view()
pt = view.sel()[0]
scope = view.scope_name(pt.begin()).rstrip()
# TODO Fix jumptags scopes (rename them to less generic scopes)
jumptag_scopes = [
'text.neovintageous.help string.neovintageous',
'text.neovintageous.help support.constant.neovintageous'
]
if scope not in jumptag_scopes:
return
subject = view.substr(view.extract_scope(pt.begin()))
if len(subject) < 3:
return
match = re.match('^\'[a-z_]+\'|\\|[^\\s\\|]+\\|$', subject)
if match:
subject = subject.strip('|')
# TODO Refactor ex_help code into a reusable middle layer so that
# this command doesn't have to call the ex command.
self.window.run_command('ex_help', {'command_line': 'help ' + subject})
def find_line_text_object(view, s):
"""Implement the line object."""
line = view.line(s)
line_content = view.substr(line)
begin = line.begin()
end = line.end()
whitespace_match = re.match("\s+", line_content)
if whitespace_match:
begin = begin + len(whitespace_match.group(0))
return (begin, end)
# TODO: Move this to units.py.
def _url(regex, text):
match = re.match(regex, text)
if match:
url = match.group('url')
# Remove end of line full stop character.
url = url.rstrip('.')
# Remove closing tag markdown link e.g. [title](url)
url = url.rstrip(')')
# Remove closing tag markdown image e.g. ]
if url[-2:] == ')]':
url = url[:-2]
return url
return None
def check_callbacks(bot, trigger, url, run=True):
"""
Check the given URL against the callbacks list. If it matches, and ``run``
is given as ``True``, run the callback function, otherwise pass. Returns
``True`` if the url matched anything in the callbacks list.
"""
# Check if it matches the exclusion list first
matched = any(regex.search(url) for regex in bot.memory['url_exclude'])
# Then, check if there's anything in the callback list
for regex, function in tools.iteritems(bot.memory['url_callbacks']):
match = regex.search(url)
if match:
if run:
function(bot, trigger, match)
matched = True
return matched
def multi_stat_check(args, filename):
dict = {}
try:
with open(args.container + "/" + filename, "r") as f:
for line in f:
m = _STAT_RE.match(line)
if m:
dict[m.group(1)] = m.group(2)
except Exception, e:
if not os.path.isdir(args.container):
os.mkdir(args.container)
debug(args.container + ": could not get last stats from " + filename)
debug(str(e))
# first time running for this container create empty file
open(args.container + "/" + filename,"w").close()
return dict
def multi_stat_update(args, container_dir, filename):
dict = {}
try:
pipe = os.popen("docker exec " + args.container + " cat " + container_dir + "/" + filename + " 2>&1")
for line in pipe:
m = _STAT_RE.match(line)
if m:
dict[m.group(1)] = m.group(2)
pipe.close()
f = open(args.container + "/" + filename,"w")
for key in dict.keys():
f.write(key + " " + dict[key] + "\n")
f.close()
except Exception, e:
debug(args.container + ": could not update " + filename)
debug(str(sys.exc_info()))
return dict
def parse(version):
"""Parse version to major, minor, patch, pre-release, build parts.
:param version: version string
:return: dictionary with the keys 'build', 'major', 'minor', 'patch',
and 'prerelease'. The prerelease or build keys can be None
if not provided
:rtype: dict
"""
match = _REGEX.match(version)
if match is None:
raise ValueError('%s is not valid SemVer string' % version)
version_parts = match.groupdict()
version_parts['major'] = int(version_parts['major'])
version_parts['minor'] = int(version_parts['minor'])
version_parts['patch'] = int(version_parts['patch'])
return version_parts
def _nat_cmp(a, b):
def convert(text):
return int(text) if re.match('[0-9]+', text) else text
def split_key(key):
return [convert(c) for c in key.split('.')]
def cmp_prerelease_tag(a, b):
if isinstance(a, int) and isinstance(b, int):
return cmp(a, b)
elif isinstance(a, int):
return -1
elif isinstance(b, int):
return 1
else:
return cmp(a, b)
a, b = a or '', b or ''
a_parts, b_parts = split_key(a), split_key(b)
for sub_a, sub_b in zip(a_parts, b_parts):
cmp_result = cmp_prerelease_tag(sub_a, sub_b)
if cmp_result != 0:
return cmp_result
else:
return cmp(len(a), len(b))
def get_iface_from_addr(addr):
"""Work out on which interface the provided address is configured."""
for iface in netifaces.interfaces():
addresses = netifaces.ifaddresses(iface)
for inet_type in addresses:
for _addr in addresses[inet_type]:
_addr = _addr['addr']
# link local
ll_key = re.compile("(.+)%.*")
raw = re.match(ll_key, _addr)
if raw:
_addr = raw.group(1)
if _addr == addr:
log("Address '%s' is configured on iface '%s'" %
(addr, iface))
return iface
msg = "Unable to infer net iface on which '%s' is configured" % (addr)
raise Exception(msg)