def _IsPrefixedWithAlias(self, query: str, tables: [str],
column_name: str) -> bool:
"""If the column has a table prefixed and has an alias
Args:
query (str): the query to parse
tables ([str]): the possible tables
column_name (str): the column name
Returns:
bool: True if it is prefixed, false if it isn't.
"""
matches = [
re.fullmatch(
'.*({0}.)+([^ ])*( )*(as)( )*{1}( ,)*.*'.format(table, column_name),
query)
for table in tables if re.fullmatch(
'.*({0}.)+([^ ])*( )*(as)( )*{1}( ,)*.*'.format(
table, column_name), query) is not None]
return len(matches) != 0
python类fullmatch()的实例源码
def _IsPrefixedWithoutAlias(self, query: str, tables: [str],
column_name: str) -> bool:
"""If the column has a table prefixed and has an no alias
Args:
query (str): the query to parse
tables ([str]): the possible tables
column_name (str): the column name
Returns:
bool: True if it is prefixed, false if it isn't.
"""
matches = [
re.fullmatch(
'.*({0}.{1})+([^ ])*( ,)*.*'.format(table, column_name), query)
for table in tables if re.fullmatch(
'.*({0}.{1})+([^ ])*( ,)*.*'.format(
table, column_name), query) is not None]
return len(matches) != 0
def getFiles(wc_name):
# assume wild card is in the last part
path, file = os.path.split(wc_name)
returnable = []
if '*' in path:
raise ValueError('must implement getFiles better')
try:
a, dirs, files = next(os.walk(os.path.normpath(path)))
except StopIteration:
return returnable
for com in files:
rematcher = wc_name.replace('/', '\\').replace('\\', '\\\\').\
replace('.', '\\.').\
replace('*', '.*').\
replace('(', '\\(').\
replace(')', '\\)')
if re.fullmatch(rematcher, path.replace('/', '\\')+ '\\' + com):
returnable.append(os.path.join(path, com))
return returnable
def set_heritage_with_date(self):
"""
Set heritage status (bien interes cultural).
Optionally, with start date qualifier.
"""
pattern = r"(ARI|RI)-(AR|BI|MU|51|52|53|54|55|56)-\d{7}(|-000\d|-000\d\d)"
if not re.fullmatch(pattern, self.bic):
self.upload = False
return
# it is a proper bic
heritage = self.mapping["heritage"]["item"]
if self.has_non_empty_attribute("fecha"):
# 20 de febrero de 1985
qualifier = {}
es_date = dateparser.parse(self.fecha, languages=['es'])
if es_date:
date_dict = utils.datetime_object_to_dict(es_date)
qualifier = {"start_time": utils.package_time(date_dict)}
else:
self.add_to_report("fecha", self.fecha, "start_time")
self.add_statement("heritage_status", heritage, qualifier)
else:
self.add_statement("heritage_status", heritage)
def remove_stopwords(self, tokens):
"""Remove stopwords from token list.
:param tokens: tokens from which stopwords should be removed
:type tokens: list
:return: tokens with filtered out stopwords
"""
ret = []
for token in tokens:
if token in self._raw_stopwords:
_logger.debug("Dropping raw stopword '%s'", token)
continue
for regexp in self._regexp_stopwords:
if re.fullmatch(regexp, token):
_logger.debug("Dropping stopword '%s' based on regexp '%s'", token,
regexp.pattern)
continue
ret.append(token)
return ret
def _getmoji(self, msg):
args = msg.content.split()
if len(args) > 1:
if len(args[1]) == 1:
try:
await msg.add_reaction(args[1])
except HTTPException:
raise CommandSyntaxError(f"Non-emoji character set as spam reaction on server.")
else:
self.plugin_config[str(msg.guild.id)]["spam_reaction"] = args[1]
await respond(msg, f"**AFFIRMATIVE. ANALYSIS: New spam reaction emoji: {args[1]}.**")
await msg.remove_reaction(args[1], msg.guild.me)
elif re.fullmatch("<:\w{1,32}:\d{1,20}>", args[1]):
t_emoji = re.search("\d{1,20}", args[1])[0]
if self.client.get_emoji(int(t_emoji)):
self.plugin_config[str(msg.guild.id)]["spam_reaction"] = t_emoji.rjust(18, "0")
await respond(msg, f"**AFFIRMATIVE. ANALYSIS: New spam reaction emoji: {args[1]}.**")
else:
raise CommandSyntaxError("Expected a single emoji as argument.")
else:
self.plugin_config[str(msg.guild.id)]["spam_reaction"] = False
await respond(msg, f"**AFFIRMATIVE. Spam reaction disabled.**")
def search_photos(picturepath):
assert picturepath, "picturepath not supplied"
logger.debug('picturepath<%s>' % picturepath)
newsearch = []
for directory, __dirs, files in os.walk(picturepath):
for filename in files:
if re.fullmatch('|'.join(PICTURE_TYPES), os.path.splitext(filename)[1][1:], re.IGNORECASE):
newsearch.append({
'directory' : directory
, 'filename' : filename
})
logger.debug("Number of photos found: %d" % len(newsearch))
return newsearch
def _get_flickr_id_secret_title_extension(filename):
photoid = UNDEFINED
secret = UNDEFINED
title = UNDEFINED
extension = UNDEFINED
try:
# the flickr title can be null
photoid, secret, title, extension = re.fullmatch('^' + general.APPLICATION_NAME
+ '_([0-9]+)_([a-z0-9]+)_(.*)\.(' + '|'.join(PICTURE_TYPES) + ')', filename, re.IGNORECASE).group(1,2,3,4)
except AttributeError:
logger.debug('Not found')
logger.debug('<{filename}>, <{photoid}>, <{secret}>, <{title}>, <{extension}>'.format(
filename=filename, photoid=photoid, secret=secret, title=title, extension=extension))
return photoid, secret, title, extension
def obtener_aptitud(genes, deseadas, noDeseadas):
patrón = reparar_regex(genes)
longitud = len(patrón)
try:
re.compile(patrón)
except re.error as e:
llave = str(e)
llave = llave[:llave.index("at position")]
info = [str(e),
"genes = ['{}']".format("', '".join(genes)),
"regex: " + patrón]
if llave not in erroresEnRegexes or len(info[1]) < len(
erroresEnRegexes[llave][1]):
erroresEnRegexes[llave] = info
return Aptitud(0, len(deseadas), len(noDeseadas), longitud)
númeroDeDeseadosQueCoincidieron = sum(
1 for i in deseadas if re.fullmatch(patrón, i))
númeroDeNoDeseadosQueCoincidieron = sum(
1 for i in noDeseadas if re.fullmatch(patrón, i))
return Aptitud(númeroDeDeseadosQueCoincidieron, len(deseadas),
númeroDeNoDeseadosQueCoincidieron, longitud)
def get_fitness(genes, wanted, unwanted):
pattern = repair_regex(genes)
length = len(pattern)
try:
re.compile(pattern)
except re.error as e:
key = str(e)
key = key[:key.index("at position")]
info = [str(e),
"genes = ['{}']".format("', '".join(genes)),
"regex: " + pattern]
if key not in regexErrorsSeen or len(info[1]) < len(
regexErrorsSeen[key][1]):
regexErrorsSeen[key] = info
return Fitness(0, len(wanted), len(unwanted), length)
numWantedMatched = sum(1 for i in wanted if re.fullmatch(pattern, i))
numUnwantedMatched = sum(1 for i in unwanted if re.fullmatch(pattern, i))
return Fitness(numWantedMatched, len(wanted), numUnwantedMatched,
length)
def install_command(theme, branch, name):
if re.fullmatch('[_\-A-Z0-9a-z]+', theme):
theme_name = name or theme
theme_path = os.path.join(get_themes_dir(), theme_name)
cmd = 'git clone --branch {} https://github.com/veripress/themes.git {}'.format(theme, theme_path)
else:
m = re.fullmatch('([_\-A-Z0-9a-z]+)/([_\-A-Z0-9a-z]+)', theme)
if not m:
raise click.BadArgumentUsage('The theme should be like "default" (branch of veripress/themes) '
'or "someone/the-theme" (third-party theme on GitHub)')
user = m.group(1)
repo = m.group(2)
theme_name = name or repo
theme_path = os.path.join(get_themes_dir(), theme_name)
cmd = 'git clone --branch {} https://github.com/{}/{}.git {}'.format(branch, user, repo, theme_path)
exit_code = os.system(cmd)
if exit_code == 0:
click.echo('\n"{}" theme has been installed successfully.'.format(theme_name))
else:
click.echo('\nSomething went wrong. Do you forget to install git? '
'Or is there another theme with same name existing?')
def _process_client_archive_info(self, archive_info):
if not archive_info.name.startswith(self.job.archive_name):
log.error('Client tried to push invalid archive %r (id=%s) to repository. Aborting.', archive_info.name, bin_to_hex(archive_info.id))
raise ValueError('BorgCube: illegal archive push %r' % archive_info.name)
log.debug('Adding archive %r (id %s)', archive_info.name, bin_to_hex(archive_info.id))
checkpoint_re = re.escape(self.job.archive_name) + r'\.checkpoint(\d+)?'
if re.fullmatch(checkpoint_re, archive_info.name):
log.debug('%r is a checkpoint - remembering that', archive_info.name)
self._add_checkpoint(archive_info.id)
else:
log.debug('%r is the finalised archive', archive_info.name)
self._final_archive = True
if not self._cache_sync_archive(archive_info.id):
log.error('Failed to synchronize archive %r into cache (see above), aborting.', archive_info.name)
raise ValueError('BorgCube: cache sync failed')
# TODO additional sanitation?
self._manifest.archives[archive_info.name] = archive_info.id, archive_info.ts
log.info('Added archive %r (id %s) to repository.', archive_info.name, bin_to_hex(archive_info.id))
self._got_archive = True
def match(self, domain, **kwargs):
match = re.fullmatch(self.pattern, domain)
if match:
newconfig = deepcopy(self)
groups = (domain,) + match.groups(default='')
if self.execute:
newconfig.execute.command = self.execute.command.format(*groups, domain=domain, **kwargs)
if self.deploy_crt:
newconfig.deploy_crt.path = self.deploy_crt.path.format(*groups, domain=domain, **kwargs)
if self.deploy_key:
newconfig.deploy_key.path = self.deploy_key.path.format(*groups, domain=domain, **kwargs)
if self.deploy_chain:
newconfig.deploy_chain.path = self.deploy_chain.path.format(*groups, domain=domain, **kwargs)
if self.deploy_full_chain:
newconfig.deploy_full_chain.path = self.deploy_full_chain.path.format(*groups, domain=domain, **kwargs)
return newconfig
else:
return None
def get_from_paths(sub_path, file_pattern):
"""
Search through the AUTOCOMPOSE_PATHs for files in the sub-path which match the given file_pattern
:param sub_path: The sub-path to look for files in each autocompose path directory.
:param file_pattern: A pattern to match files.
:return: A list of files.
"""
paths = os.environ['AUTOCOMPOSE_PATH'].split(":")
results = []
for path in paths:
try:
files = os.listdir(path=os.path.join(path, sub_path))
for file in files:
if re.fullmatch(file_pattern, file):
results.append(os.path.join(path, sub_path, file_pattern))
except FileNotFoundError:
pass
return results
def get(self, v, full_match=False):
""" returns a table from columns given as v
this function is equivalent to :func:`__getitem__` but preserve the
Table format and associated properties (units, description, header)
Parameters
----------
v: str
pattern to filter the keys with
full_match: bool
if set, use :func:`re.fullmatch` instead of :func:`re.match`
"""
new_keys = self.keys(v)
t = self.__class__(self[new_keys])
t.header.update(**self.header)
t._aliases.update((k, v) for (k, v) in self._aliases.items() if v in new_keys)
t._units.update((k, v) for (k, v) in self._units.items() if v in new_keys)
t._desc.update((k, v) for (k, v) in self._desc.items() if v in new_keys)
return t
def test_show_missing_relative_app_version():
app_id = _ZERO_INSTANCE_APP_ID
with _zero_instance_app():
_update_app(
app_id,
'tests/data/marathon/apps/update_zero_instance_sleep.json')
# Marathon persists app versions indefinitely by ID, so pick a large
# index here in case the history is long
cmd = ['dcos', 'marathon', 'app', 'show', '--app-version=-200', app_id]
returncode, stdout, stderr = exec_command(cmd)
assert returncode == 1
assert stdout == b''
pattern = ("Application 'zero-instance-app' only has [1-9][0-9]* "
"version\\(s\\)\\.\n")
assert re.fullmatch(pattern, stderr.decode('utf-8'), flags=re.DOTALL)
def add_app(app_path, wait=True):
""" Add an app, and wait for it to deploy
:param app_path: path to app's json definition
:type app_path: str
:param wait: whether to wait for the deploy
:type wait: bool
:rtype: None
"""
cmd = ['dcos', 'marathon', 'app', 'add', app_path]
returncode, stdout, stderr = exec_command(cmd)
assert returncode == 0
assert re.fullmatch('Created deployment \S+\n', stdout.decode('utf-8'))
assert stderr == b''
if wait:
watch_all_deployments()
def add_pod(pod_path, wait=True):
"""Add a pod, and wait for it to deploy
:param pod_path: path to pod's json definition
:type pod_path: str
:param wait: whether to wait for the deploy
:type wait: bool
:rtype: None
"""
cmd = ['dcos', 'marathon', 'pod', 'add', pod_path]
returncode, stdout, stderr = exec_command(cmd)
assert returncode == 0
assert re.fullmatch('Created deployment \S+\n', stdout.decode('utf-8'))
assert stderr == b''
if wait:
watch_all_deployments()
def add_group(group_path, wait=True):
"""Add a group, and wait for it to deploy
:param group_path: path to pod's json definition
:type group_path: str
:param wait: whether to wait for the deploy
:type wait: bool
:rtype: None
"""
cmd = ['dcos', 'marathon', 'group', 'add', group_path]
returncode, stdout, stderr = exec_command(cmd)
assert returncode == 0
assert re.fullmatch('Created deployment \S+\n', stdout.decode('utf-8'))
assert stderr == b''
if wait:
watch_all_deployments()
def _assert_pod_list_table():
_wait_for_instances({'/double-pod': 2, '/good-pod': 1, '/winston': 1})
returncode, stdout, stderr = exec_command(_POD_LIST_CMD)
assert returncode == 0
assert stderr == b''
stdout_lines = stdout.decode('utf-8').split('\n')
pattern = r'ID\+TASKS +INSTANCES +VERSION +STATUS +STATUS SINCE +WAITING *'
assert re.fullmatch(pattern, stdout_lines[0])
assert stdout_lines[1].startswith('/double-pod')
assert stdout_lines[2].startswith(' |-thing-1')
assert stdout_lines[3].startswith(' |-thing-2')
assert stdout_lines[4].startswith('/good-pod')
assert stdout_lines[5].startswith(' |-good-container')
assert stdout_lines[6].startswith('/winston')
assert stdout_lines[7].startswith(' |-the-cat')
assert stdout_lines[8].startswith(' |-thing-1')
assert stdout_lines[9].startswith(' |-thing-2')
assert stdout_lines[10] == ''
assert len(stdout_lines) == 11
def name_get(value):
'''
Check that a string is a valid name, and return it. A name is a string
that is just one word, without whitespace. Raise a GetError if the
string is not a valid name.
'''
if not isinstance(value, str):
raise GetError("non-string value '%s' not a valid name" % value)
if not value:
raise GetError("empty string not a valid name")
name = value.strip()
if not re.fullmatch("[^\\s][^\\s]*", name):
raise GetError("given string not a valid name (contains whitespace): %s" % name)
return name
def test_keyword_parameters(self):
# Issue #20283: Accepting the string keyword parameter.
pat = re.compile(r'(ab)')
self.assertEqual(
pat.match(string='abracadabra', pos=7, endpos=10).span(), (7, 9))
self.assertEqual(
pat.fullmatch(string='abracadabra', pos=7, endpos=9).span(), (7, 9))
self.assertEqual(
pat.search(string='abracadabra', pos=3, endpos=10).span(), (7, 9))
self.assertEqual(
pat.findall(string='abracadabra', pos=3, endpos=10), ['ab'])
self.assertEqual(
pat.split(string='abracadabra', maxsplit=1),
['', 'ab', 'racadabra'])
self.assertEqual(
pat.scanner(string='abracadabra', pos=3, endpos=10).search().span(),
(7, 9))
def _create_pattern_from_string(self, string):
parts = re.split("\W", string)
pattern = re.sub("\w+", "{}", string)
pattern_list = []
for part in parts:
if len(part) > 0:
if re.fullmatch("[a-z]+", part):
pattern_list.append("\l")
elif re.fullmatch("[A-Z]+", part):
pattern_list.append("\L")
elif re.fullmatch("[a-zA-Z]+", part):
pattern_list.append("\i")
elif re.fullmatch("\d+", part):
pattern_list.append("\d")
else:
pattern_list.append("\w")
return pattern.format(*pattern_list)
def test_keyword_parameters(self):
# Issue #20283: Accepting the string keyword parameter.
pat = re.compile(r'(ab)')
self.assertEqual(
pat.match(string='abracadabra', pos=7, endpos=10).span(), (7, 9))
self.assertEqual(
pat.fullmatch(string='abracadabra', pos=7, endpos=9).span(), (7, 9))
self.assertEqual(
pat.search(string='abracadabra', pos=3, endpos=10).span(), (7, 9))
self.assertEqual(
pat.findall(string='abracadabra', pos=3, endpos=10), ['ab'])
self.assertEqual(
pat.split(string='abracadabra', maxsplit=1),
['', 'ab', 'racadabra'])
self.assertEqual(
pat.scanner(string='abracadabra', pos=3, endpos=10).search().span(),
(7, 9))
def re_fullmatch(regex, string, flags=0):
"""Emulate python-3.4 re.fullmatch()."""
return re.match("(?:" + regex + r")\Z", string, flags=flags)
# The issue this function tries to solve is to have a text writer where unicode
# data can be written without decoding error. It should work in the following
# conditions:
# - python 2 & 3, output to terminal
# - python 2 & 3, output to a pipe or shell redirection
# - python 2 & 3, output to a StringIO
#
# When using python 2, if the program output is redirected to a pipe or file,
# the output encoding may be set to 'ascii',
# potentially producing UnicodeEncodeError.
# Redirections do not seem to cause such issue with python 3
# but explicit utf-8 encoding seems a sensible choice to output data to be
# consumed by other programs (e.g: JSON).
def search_for_images(root_path):
extensions = [extension for x in models.FORMAT_EXTENSIONS for extension in x[1]]
logging.debug("Looking for extensions: %s", str(extensions))
for dirname, dirnames, filenames in os.walk(root_path):
for d in dirnames[:]:
for sd in app.config["SEARCH_EXCLUDE_DIRS"]:
if re.fullmatch(sd, os.path.join(dirname, d)) is not None:
logging.debug("Skipping directory %s", os.path.join(dirname, d))
dirnames.remove(d)
break
for filename in filenames:
for sf in app.config["SEARCH_EXCLUDE_FILES"]:
if re.fullmatch(sf, os.path.join(dirname, filename)) is not None:
logging.debug("Skipping file %s", os.path.join(dirname, filename))
break
else:
if os.path.splitext(filename)[1].lower()[1:] in extensions:
yield os.path.join(dirname, filename)
def replace_emoji(text, client):
if text is None:
return ''
output = text
symbols = re.findall(r'\{([A-Z0-9/]{1,3})\}', text)
for symbol in symbols:
name = symbol
name = name.replace('/', '')
if len(name) == 1:
if re.fullmatch('[0-9]', name):
name = '0' + name
else:
name = name + name
emoji = find_emoji(name, client)
if emoji is not None:
output = output.replace('{' + symbol + '}', str(emoji))
return output
def get_ss(self):
containers = self.get_containers()
for container in containers['data']:
# ????????
cmd = None
if re.fullmatch(ss_image_name, container['attributes']['image_name']):
cmd = re.fullmatch(re_pwd_encry, container['attributes']['cmd'])
password = cmd.group('password')
encryption = cmd.group('encryption')
for ss in container['attributes']['port_mappings']:
ss_dict = dict()
ss_dict['ip'] = re.search(re_ss_ip, ss[0]['host']).group().replace('-', '.')
ss_dict['port'] = ss[0]['service_port']
ss_dict['password'] = password
ss_dict['encryption'] = encryption
ss_dict['uri'] = generate_ss_uri(ss_dict)
self.ss_set.append(ss_dict)
def retro(self, ctx, line_1: str, line_2: str = '', *, line_3: str = ''):
"""Credits: ReinaSakuraba (Reina#0277)"""
if not re.fullmatch(r'[A-Za-z0-9 ]+', line_1):
return await ctx.send('First line only supports alphanumerical characters.')
data = {
'bcg': random.randint(1, 5),
'txt': random.randint(1, 4),
'text1': line_1,
'text2': line_2,
'text3': line_3,
}
async with ctx.session.post('https://photofunia.com/effects/retro-wave', data=data) as r:
txt = await r.text()
link = re.search(r'(https?.+?.jpg\?download)', txt)
async with ctx.session.get(link.group(1)) as r:
await ctx.send(file=discord.File(io.BytesIO(await r.read()), 'retro.jpg'))
def validate_user_name(ctx, param, value: str) -> str:
"""Validates if ``value`` is a correct user name.
Parameters
----------
value : str
Possible user name.
Returns
-------
value : str
Valid user name.
Raises
------
click.BadParameter
If user name is not valid for unix systems.
"""
if re.fullmatch(r'^[a-z][a-z0-9_]{5,}', value):
return value
else:
raise click.BadParameter(
'User names can only contain 0-9, a-z and _.')