def _add_cloud_pocket(pocket):
"""Add a cloud pocket as /etc/apt/sources.d/cloud-archive.list
Note that this overwrites the existing file if there is one.
This function also converts the simple pocket in to the actual pocket using
the CLOUD_ARCHIVE_POCKETS mapping.
:param pocket: string representing the pocket to add a deb spec for.
:raises: SourceConfigError if the cloud pocket doesn't exist or the
requested release doesn't match the current distro version.
"""
apt_install(filter_installed_packages(['ubuntu-cloud-keyring']),
fatal=True)
if pocket not in CLOUD_ARCHIVE_POCKETS:
raise SourceConfigError(
'Unsupported cloud: source option %s' %
pocket)
actual_pocket = CLOUD_ARCHIVE_POCKETS[pocket]
with open('/etc/apt/sources.list.d/cloud-archive.list', 'w') as apt:
apt.write(CLOUD_ARCHIVE.format(actual_pocket))
python类match()的实例源码
def _add_cloud_staging(cloud_archive_release, openstack_release):
"""Add the cloud staging repository which is in
ppa:ubuntu-cloud-archive/<openstack_release>-staging
This function checks that the cloud_archive_release matches the current
codename for the distro that charm is being installed on.
:param cloud_archive_release: string, codename for the release.
:param openstack_release: String, codename for the openstack release.
:raises: SourceConfigError if the cloud_archive_release doesn't match the
current version of the os.
"""
_verify_is_ubuntu_rel(cloud_archive_release, openstack_release)
ppa = 'ppa:ubuntu-cloud-archive/{}-staging'.format(openstack_release)
cmd = 'add-apt-repository -y {}'.format(ppa)
_run_with_retries(cmd.split(' '))
def _add_cloud_staging(cloud_archive_release, openstack_release):
"""Add the cloud staging repository which is in
ppa:ubuntu-cloud-archive/<openstack_release>-staging
This function checks that the cloud_archive_release matches the current
codename for the distro that charm is being installed on.
:param cloud_archive_release: string, codename for the release.
:param openstack_release: String, codename for the openstack release.
:raises: SourceConfigError if the cloud_archive_release doesn't match the
current version of the os.
"""
_verify_is_ubuntu_rel(cloud_archive_release, openstack_release)
ppa = 'ppa:ubuntu-cloud-archive/{}-staging'.format(openstack_release)
cmd = 'add-apt-repository -y {}'.format(ppa)
_run_with_retries(cmd.split(' '))
def summaries_to_be_listed_in_waiting_list(
result_log, status_pr, statuses, trigger_str):
summaries = []
for a_status in statuses:
# results's 2nd. condition which match to "Start" is for
# fail-safe.
if not result_log.has_result_of_status(
status=a_status, results=["Succeed", "Start", "Failed"]):
st_str = status_pr.make_tweet_string_from_toot(
a_status, hashtag=trigger_str)
rs_str = "Waiting"
rs_sm = result_log.make_result_and_others_summary(
status_string=st_str, hashtag=trigger_str, result=rs_str)
in_sm = result_log.make_status_summary("inbound", a_status)
rs_sm.update(in_sm)
summaries.append(rs_sm)
return summaries
def cares_about(self, delta):
"""Return True if this observer "cares about" (i.e. wants to be
called) for a this delta.
"""
if (self.entity_id and delta.get_id() and
not re.match(self.entity_id, str(delta.get_id()))):
return False
if self.entity_type and self.entity_type != delta.entity:
return False
if self.action and self.action != delta.type:
return False
if self.predicate and not self.predicate(delta):
return False
return True
def _ungroup_go_imports(fname):
with open(fname, 'r+') as f:
content = f.readlines()
out = []
import_block = False
for line in content:
c = line.strip()
if import_block:
if c == '':
continue
elif re.match(END_IMPORT_REGEX, c) is not None:
import_block = False
elif re.match(BEGIN_IMPORT_REGEX, c) is not None:
import_block = True
out.append(line)
f.seek(0)
f.writelines(out)
f.truncate()
def module_requirements(requirements_path, module_names, strict_bounds,
conda_format=False):
module_names = set(module_names)
found = set()
module_lines = []
for line in read_requirements(requirements_path,
strict_bounds=strict_bounds):
match = REQ_PATTERN.match(line)
if match is None:
raise AssertionError("Could not parse requirement: '%s'" % line)
name = match.group(1)
if name in module_names:
found.add(name)
if conda_format:
line = _conda_format(line)
module_lines.append(line)
if found != module_names:
raise AssertionError(
"No requirements found for %s." % (module_names - found)
)
return module_lines
def parse(self, response):
"""
???html??????url ?????url??????
?????url???? /question/xxx ?????????????
"""
all_urls = response.css("a::attr(href)").extract()
all_urls = [parse.urljoin(response.url, url) for url in all_urls]
# ??lambda???????url????????true???????false???
all_urls = filter(lambda x: True if x.startswith("https") else False, all_urls)
for url in all_urls:
match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*", url)
if match_obj:
# ?????question???????????????????
request_url = match_obj.group(1)
yield scrapy.Request(request_url, headers=self.headers, callback=self.parse_question)
#??
# break
else:
# pass
# ????question??????????
yield scrapy.Request(url, headers=self.headers, callback=self.parse)
def login(self, response):
response_text = response.text
match_obj = re.match('.*name="_xsrf" value="(.*?)"', response_text, re.DOTALL)
xsrf = ''
if match_obj:
xsrf = (match_obj.group(1))
if xsrf:
post_url = "https://www.zhihu.com/login/phone_num"
post_data = {
"_xsrf": xsrf,
"phone_num": "18487255487",
"password": "ty158917",
"captcha": ""
}
import time
t = str(int(time.time() * 1000))
captcha_url = "https://www.zhihu.com/captcha.gif?r={0}&type=login".format(t)
yield scrapy.Request(captcha_url, headers=self.headers, meta={"post_data":post_data}, callback=self.login_after_captcha)
def get_xsrf():
#??xsrf code
response = session.get("https://www.zhihu.com", headers=header)
response_text = response.text
match_obj = re.match('.*name="_xsrf" value="(.*?)"', response_text, re.DOTALL)
xsrf = ''
if match_obj:
xsrf = (match_obj.group(1))
return xsrf
def zhihu_login(account, password):
#????
if re.match("^1\d{10}",account):
print ("??????")
post_url = "https://www.zhihu.com/login/phone_num"
post_data = {
"_xsrf": get_xsrf(),
"phone_num": account,
"password": password,
"captcha":get_captcha()
}
else:
if "@" in account:
#??????????
print("??????")
post_url = "https://www.zhihu.com/login/email"
post_data = {
"_xsrf": get_xsrf(),
"email": account,
"password": password
}
response_text = session.post(post_url, data=post_data, headers=header)
session.cookies.save()
# get_index()
# is_login()
# get_captcha()
def get_nums(value):
match_re = re.match(".*?(\d+).*", value)
if match_re:
nums = int(match_re.group(1))
else:
nums = 0
return nums
def allocate(self, ip_addr, name, platform, cpus, memory, disk):
"""When a node is found, scheduler calls this method with IP address,
name, CPUs, memory and disk available on that node. This method should
return a number indicating number of CPUs to use. If return value is 0,
the node is not used; if the return value is < 0, this allocation is
ignored (next allocation in the 'node_allocations' list, if any, is
applied).
"""
if not re.match(self.ip_rex, ip_addr):
return -1
if (self.platform and not re.search(self.platform, platform)):
return -1
if ((self.memory and memory and self.memory > memory) or
(self.disk and disk and self.disk > disk)):
return 0
if self.cpus > 0:
if self.cpus > cpus:
return 0
return self.cpus
elif self.cpus == 0:
return 0
else:
cpus += self.cpus
if cpus < 0:
return 0
return cpus
def __init__(self, host, tcp_port):
if re.match(r'^\d+[\.\d]+$', host) or re.match(r'^[0-9a-fA-F:]+$', host):
self.addr = host
else:
self.addr = socket.getaddrinfo(host, 0, 0, socket.SOCK_STREAM)[0][4][0]
self.port = int(tcp_port)
def allocate(self, ip_addr, name, platform, cpus, memory, disk):
"""When a node is found, scheduler calls this method with IP address,
name, CPUs, memory and disk available on that node. This method should
return a number indicating number of CPUs to use. If return value is 0,
the node is not used; if the return value is < 0, this allocation is
ignored (next allocation in the 'node_allocations' list, if any, is
applied).
"""
if not re.match(self.ip_rex, ip_addr):
return -1
if (self.platform and not re.search(self.platform, platform)):
return -1
if ((self.memory and memory and self.memory > memory) or
(self.disk and disk and self.disk > disk)):
return 0
if self.cpus > 0:
if self.cpus > cpus:
return 0
return self.cpus
elif self.cpus == 0:
return 0
else:
cpus += self.cpus
if cpus < 0:
return 0
return cpus
def __init__(self, host, tcp_port):
if re.match(r'^\d+[\.\d]+$', host) or re.match(r'^[0-9a-fA-F:]+$', host):
self.addr = host
else:
self.addr = socket.getaddrinfo(host, 0, 0, socket.SOCK_STREAM)[0][4][0]
self.port = int(tcp_port)
def __init__(self, fqArchiveUrl, filtersDir, outputPrefix, outputUrl, diskSize, diskType, logsPath, container, scriptUrl, tag, cores, mem, preemptible):
super(PipelineStep, self).__init__()
fqFileName = os.path.basename(fqArchiveUrl)
fqInputs = "{fqArchive}:{fqFileName}".format(fqArchive=fqArchiveUrl, fqFileName=fqFileName)
try:
filtersDirContents = subprocess.check_output(["gsutil", "ls", filtersDir])
except subprocess.CalledProcessError as e:
print "ERROR: couldn't get a listing of filter files! -- {reason}".format(reason=e)
exit(-1)
bfInputs = [x for x in filtersDirContents.split('\n') if re.match('^.*\.bf$', x) or re.match('^.*\.txt', x)]
bfInputs.append(fqInputs)
inputs = ",".join(["{url}:{filename}".format(url=x, filename=os.path.basename(x)) for x in bfInputs])
outputs = "{outputPrefix}*:{outDir}".format(outputPrefix=outputPrefix, outDir=outputUrl)
env = "INPUT_FILE={fqFileName},OUTPUT_PREFIX={outputPrefix},FILTERS_LIST={filtersList}".format(fqFileName=fqFileName, outputPrefix=outputPrefix, filtersList=','.join([os.path.basename(x) for x in bfInputs if re.match('^.*\.bf$', x)]))
self._step = PipelineSchema("biobloomcategorizer",
self._pipelinesConfig,
logsPath,
container,
scriptUrl=scriptUrl,
cores=cores,
mem=mem,
diskSize=diskSize,
diskType=diskType,
inputs=inputs,
outputs=outputs,
env=env,
tag=tag,
preemptible=preemptible)
def docker_version():
out = subprocess.check_output(["docker", "-v"])
mo = re.match(br"Docker version (\d+)\.(\d+)\.(\d+)", out)
if mo:
return tuple(map(int, mo.groups()))
die("unable to parse a version number from the output of 'docker -v'")
def _processLength(self, lengthMatch):
"""
Processes the length definition of a netstring.
Extracts and stores in C{self._expectedPayloadSize} the number
representing the netstring size. Removes the prefix
representing the length specification from
C{self._remainingData}.
@raise NetstringParseError: if the received netstring does not
start with a number or the number is bigger than
C{self.MAX_LENGTH}.
@param lengthMatch: A regular expression match object matching
a netstring length specification
@type lengthMatch: C{re.Match}
"""
endOfNumber = lengthMatch.end(1)
startOfData = lengthMatch.end(2)
lengthString = self._remainingData[:endOfNumber]
# Expect payload plus trailing comma:
self._expectedPayloadSize = self._extractLength(lengthString) + 1
self._remainingData = self._remainingData[startOfData:]
def _match_to_dict(
self, match: Match, errors: bool = True) -> Dict[str, Any]:
"""Convert a regular expression Match to a dict of (name, value) for
all PathVars.
Args:
match: A :class:`re.Match`.
errors: If True, raise an exception for validation failure,
otherwise return None.
Returns:
A (name, value) dict.
Raises:
ValueError if any values fail validation.
"""
return match_to_dict(match, self.path_vars, errors)
def find(
self, root: PathLike = None,
recursive: bool = False) -> Sequence[PathInst]:
"""Find all paths in `root` matching this spec.
Args:
root: Directory in which to begin the search.
recursive: Whether to search recursively.
Returns:
A sequence of PathInst.
"""
if root is None:
root = self.default_search_root()
find_results = find(
root, self.pattern, path_types=[self.path_type],
recursive=recursive, return_matches=True)
matches = dict(
(path, self._match_to_dict(match, errors=False))
for path, match in cast(
Sequence[Tuple[str, Match[str]]], find_results))
return [
path_inst(path, match)
for path, match in matches.items()
if match is not None]
def unmatched(match):
"""Return unmatched part of re.Match object."""
start, end = match.span(0)
return match.string[:start]+match.string[end:]
def unmatched(match):
"""Return unmatched part of re.Match object."""
start, end = match.span(0)
return match.string[:start]+match.string[end:]
def unmatched(match):
"""Return unmatched part of re.Match object."""
start, end = match.span(0)
return match.string[:start]+match.string[end:]
def unmatched(match):
"""Return unmatched part of re.Match object."""
start, end = match.span(0)
return match.string[:start]+match.string[end:]
def unmatched(match):
"""Return unmatched part of re.Match object."""
start, end = match.span(0)
return match.string[:start]+match.string[end:]
def unmatched(match):
"""Return unmatched part of re.Match object."""
start, end = match.span(0)
return match.string[:start]+match.string[end:]
def _truncate_float(matchobj, format_str='0.2g'):
"""Truncate long floats
Args:
matchobj (re.Match object): contains original float
format_str (str): format specifier
Returns:
returns truncated float
"""
if matchobj.group(0):
return format(float(matchobj.group(0)), format_str)
def unmatched(match):
"""Return unmatched part of re.Match object."""
start, end = match.span(0)
return match.string[:start]+match.string[end:]
def unmatched(match):
"""Return unmatched part of re.Match object."""
start, end = match.span(0)
return match.string[:start]+match.string[end:]