def run_batch_query(self, query: str,
labels: List[str] = None,
params: List[dict] = None,
chunk_size: int = 1000):
node_labels = ':{0}'.format(':'.join(labels)) \
if labels else ''
query_template = Template("UNWIND {params} AS params " + query)
labeled_query = query_template.safe_substitute(labels=node_labels)
chunk_count = 1
def batch():
for i in range(0, len(params), chunk_size):
logger.debug('starting chunk %s', i)
result = (yield labeled_query,
dict(params=params[i:i + chunk_size]))
logger.debug(result)
result = self.run_in_tx(batch(), chunk_count=chunk_count)
return result
python类Template()的实例源码
def generate_menu(name, menu_text):
"""Generate and return a sublime-menu from a template."""
from . import persist
plugin_dir = os.path.join(sublime.packages_path(), persist.PLUGIN_DIRECTORY)
path = os.path.join(plugin_dir, '{}.sublime-menu.template'.format(name))
with open(path, encoding='utf8') as f:
template = f.read()
# Get the indent for the menus within the template,
# indent the chooser menus except for the first line.
indent = MENU_INDENT_RE.search(template).group(1)
menu_text = indent_lines(menu_text, indent)
text = Template(template).safe_substitute({'menus': menu_text})
path = os.path.join(plugin_dir, '{}.sublime-menu'.format(name))
with open(path, mode='w', encoding='utf8') as f:
f.write(text)
return text
def get_template(self, template_name):
tried = []
for template_file in self.iter_template_filenames(template_name):
try:
with io.open(template_file, encoding=settings.FILE_CHARSET) as fp:
template_code = fp.read()
except IOError as e:
if e.errno == errno.ENOENT:
tried.append((
Origin(template_file, template_name, self),
'Source does not exist',
))
continue
raise
return Template(template_code)
else:
raise TemplateDoesNotExist(template_name, tried=tried, backend=self)
def get_wrapper_template(self, declaration):
arg_desc = []
for option in declaration['options']:
option_desc = [self.TYPE_NAMES.get(arg['type'], arg['type']) + ' ' + arg['name']
for arg in option['arguments']
if not arg.get('ignore_check', False)]
# TODO: this should probably go to THPLongArgsPlugin
if option_desc:
arg_desc.append('({})'.format(', '.join(option_desc)))
else:
arg_desc.append('no arguments')
arg_desc.sort(key=len)
arg_desc = ['"' + desc + '"' for desc in arg_desc]
arg_str = ', '.join(arg_desc)
readable_name = declaration['python_name']
return Template(self.WRAPPER_TEMPLATE.safe_substitute(
readable_name=readable_name, num_options=len(arg_desc),
expected_args=arg_str))
def declare_methods(self, stateless):
tensor_methods = ''
for declaration in (self.declarations if not stateless else self.stateless_declarations):
flags = 'METH_VARARGS'
flags += ' | ' + declaration.get('method_flags') if 'method_flags' in declaration else ''
if not declaration.get('only_register'):
flags += ' | METH_KEYWORDS'
if declaration.get('override_method_flags'):
flags = declaration['override_method_flags']
entry = Template(' {"$python_name", (PyCFunction)$name, $flags, NULL},\n').substitute(
python_name=declaration['python_name'], name=declaration['name'], flags=flags
)
if 'defined_if' in declaration:
entry = self.preprocessor_guard(entry, declaration['defined_if'])
tensor_methods += entry
return self.TENSOR_METHODS_DECLARATION.substitute(methods=tensor_methods, stateless=('' if not stateless else 'stateless_'))
def is_available(room_email,start_time,end_time):
xml_template = open("getavailibility_template.xml", "r").read()
xml = Template(xml_template)
headers = {}
headers["Content-type"] = "text/xml; charset=utf-8"
data=unicode(xml.substitute(email=room_email,starttime=start_time,endtime=end_time)).strip()
response=requests.post(url,headers = headers, data= data, auth= HttpNtlmAuth(user,password))
tree = ET.fromstring(response.text.encode('utf-8'))
status = "Free"
# arrgh, namespaces!!
elems=tree.findall(".//{http://schemas.microsoft.com/exchange/services/2006/types}BusyType")
for elem in elems:
status=elem.text
elems=tree.findall(".//faultcode")
if elems:
sys.stderr.write("Error occured\n")
sys.stderr.write("tree: "+str(tree)+"\n")
sys.stderr.write("response: "+response.text.encode('utf-8')+"\n")
status= "N/A"
sys.stderr.write("Room status: "+str(status)+"\n")
return (status == "Free")
remote_connection.py 文件源码
项目:devsecops-example-helloworld
作者: boozallen
项目源码
文件源码
阅读 30
收藏 0
点赞 0
评论 0
def execute(self, command, params):
"""
Send a command to the remote server.
Any path subtitutions required for the URL mapped to the command should be
included in the command parameters.
:Args:
- command - A string specifying the command to execute.
- params - A dictionary of named parameters to send with the command as
its JSON payload.
"""
command_info = self._commands[command]
assert command_info is not None, 'Unrecognised command %s' % command
data = utils.dump_json(params)
path = string.Template(command_info[1]).substitute(params)
url = '%s%s' % (self._url, path)
return self._request(command_info[0], url, body=data)
def __init__(self, fmt=None, datefmt="%Y-%m-%d %H:%M:%S", exceptionfmt="Traceback (most recent call last):\n%(traceback)s\n%(classname)s: %(message)s"):
# set standard Formatter values
self.datefmt = datefmt
self._fmt = fmt
self._exceptionfmt = exceptionfmt
# set colors
self._fmt = Template(fmt).safe_substitute(SHELL_COLORS)
self._exceptionfmt = Template(exceptionfmt).safe_substitute(SHELL_COLORS)
# do we have a tty?
if sys.stdout.isatty() or os.isatty(sys.stdout.fileno()):
self.useColors = True
else:
self.useColors = False
# remove all colors
if not self.useColors:
self._fmt = re.sub("\033\[(\d+|;)+m", "", self._fmt)
self._exceptionfmt = re.sub("\033\[(\d+|;)+m", "", self._exceptionfmt)
def __init__(self, fmt=None, datefmt="%Y-%m-%d %H:%M:%S", exceptionfmt="Traceback (most recent call last):\n%(traceback)s\n%(classname)s: %(message)s"):
# set standard Formatter values
self.datefmt = datefmt
self._fmt = fmt
self._exceptionfmt = exceptionfmt
# set colors
self._fmt = Template(fmt).safe_substitute(SHELL_COLORS)
self._exceptionfmt = Template(exceptionfmt).safe_substitute(SHELL_COLORS)
# do we have a tty?
if sys.stdout.isatty() or os.isatty(sys.stdout.fileno()):
self.useColors = True
else:
self.useColors = False
# remove all colors
if not self.useColors:
self._fmt = re.sub("\033\[(\d+|;)+m", "", self._fmt)
self._exceptionfmt = re.sub("\033\[(\d+|;)+m", "", self._exceptionfmt)
def _get_submodule_code(op):
parameters = ', '.join('%s &%s' % (_dtype_to_ctype[t], name)
for i, (name, t)
in enumerate(zip(op.param_names, op.types)))
typedecl = ''.join(('typedef %s in%d_type;\n' % (_dtype_to_ctype[t], i))
for i, t in enumerate(op.types[:op.nin]))
typedecl += ''.join(('typedef %s out%d_type;\n' % (_dtype_to_ctype[t], i))
for i, t in enumerate(op.types[op.nin:]))
module_code = string.Template('''
__device__ void ${name}(${parameters}) {
${typedecl}
${operation};
}
''').substitute(
name=op.name,
parameters=parameters,
operation=op.operation,
typedecl=typedecl)
return module_code + '\n'
def _get_pre_code(in_vars, out_vars, operation):
in_params = ', '.join('%s v%s' % (_dtype_to_ctype[v.ty], v.num)
for v in in_vars)
out_params = ''.join('%s v%s;\n' % (_dtype_to_ctype[v.ty], v.num)
for v in out_vars)
module_code = string.Template('''
__device__ ${return_type} _pre_map(${in_params}) {
${out_params}
${operation};
return ${return_var};
}
''').substitute(
return_type=_dtype_to_ctype[out_vars[0].ty],
in_params=in_params,
out_params=out_params,
operation=operation,
return_var='v%d' % out_vars[0].num)
return module_code
def process_expansions(dct):
def expand(val, dct):
if isinstance(val, six.integer_types) or isinstance(val, bool):
return val
if isinstance(val, six.string_types):
dct2 = copy.deepcopy(dct)
for env_key, env_val in six.iteritems(os.environ):
dct2[env_key] = env_val
return string.Template(val).safe_substitute(dct2)
if isinstance(val, list):
return [expand(x, dct) for x in val]
if isinstance(val, dict):
return {k: expand(v,val) for k,v in six.iteritems(val)}
return val
for key,val in six.iteritems(dct):
nval = expand(val, dct)
dct[key] = nval
def translate(self):
visitor = self.translator_class(self.document)
self.document.walkabout(visitor)
# copy parts
for part in self.visitor_attributes:
setattr(self, part, getattr(visitor, part))
# get template string from file
try:
template_file = open(self.document.settings.template, 'rb')
except IOError:
template_file = open(os.path.join(self.default_template_path,
self.document.settings.template), 'rb')
template = string.Template(unicode(template_file.read(), 'utf-8'))
template_file.close()
# fill template
self.assemble_parts() # create dictionary of parts
self.output = template.substitute(self.parts)
def substitute(template, context):
"""
Performs string substitution.
Args:
template (str): The string to perform the substitution on.
context (dict): A mapping of keys to values for substitution.
Returns:
str: The substituted string.
"""
from string import Template
template = Template(template)
return template.safe_substitute(context)
def translate(self):
visitor = self.translator_class(self.document)
self.document.walkabout(visitor)
# copy parts
for part in self.visitor_attributes:
setattr(self, part, getattr(visitor, part))
# get template string from file
try:
template_file = open(self.document.settings.template, 'rb')
except IOError:
template_file = open(os.path.join(self.default_template_path,
self.document.settings.template), 'rb')
template = string.Template(unicode(template_file.read(), 'utf-8'))
template_file.close()
# fill template
self.assemble_parts() # create dictionary of parts
self.output = template.substitute(self.parts)
def init_with_genesis(smoketest_genesis):
with open(GENESIS_PATH, 'wb') as handler:
json.dump(smoketest_genesis, handler)
cmd = '$RST_GETH_BINARY --datadir $RST_DATADIR init {}'.format(GENESIS_PATH)
args = shlex.split(
Template(cmd).substitute(os.environ)
)
init = subprocess.Popen(
args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
out, err = init.communicate()
assert init.returncode == 0
return (out, err)
def format_index_html(self, dirname):
INDEX_TEMPLATE = u'''
<html>
<title>Directory listing for $dirname</title>
<body>
<h2>Directory listing for $dirname</h2>
<hr>
<ul>
$html
</ul>
<hr>
</body></html>
'''
html = ''
if not isinstance(dirname, unicode):
dirname = dirname.decode(sys.getfilesystemencoding())
for name in os.listdir(dirname):
fullname = os.path.join(dirname, name)
suffix = u'/' if os.path.isdir(fullname) else u''
html += u'<li><a href="%s%s">%s%s</a>\r\n' % (name, suffix, name, suffix)
return string.Template(INDEX_TEMPLATE).substitute(dirname=dirname, html=html)
def execute(self, command, params):
"""
Send a command to the remote server.
Any path subtitutions required for the URL mapped to the command should be
included in the command parameters.
:Args:
- command - A string specifying the command to execute.
- params - A dictionary of named parameters to send with the command as
its JSON payload.
"""
command_info = self._commands[command]
assert command_info is not None, 'Unrecognised command %s' % command
data = utils.dump_json(params)
path = string.Template(command_info[1]).substitute(params)
url = '%s%s' % (self._url, path)
return self._request(command_info[0], url, body=data)
def __init__(self, detail=None, headers=None, comment=None,
body_template=None, **kw):
Response.__init__(self,
status='%s %s' % (self.code, self.title),
**kw)
Exception.__init__(self, detail)
if headers:
self.headers.extend(headers)
self.detail = detail
self.comment = comment
if body_template is not None:
self.body_template = body_template
self.body_template_obj = Template(body_template)
if self.empty_body:
del self.content_type
del self.content_length
def handle(self, *options):
if not options:
self.show_usage('You must provide a project name')
project_name = options[0]
template_dir = os.path.join(noopy.__path__[0], 'project_template')
os.mkdir(project_name)
shutil.copytree(os.path.join(template_dir, 'src'), '{}/src'.format(project_name))
context = dict(
project_name=project_name,
lambda_prefix=to_pascal_case(project_name),
)
context['account_id'] = raw_input('Input aws account id > ').strip()
context['role_arn'] = raw_input('Input role arn to be granted to lambda function > ').strip()
for file_path in glob.glob('{}/*.py'.format(template_dir)):
with open(file_path, 'r') as f:
content = f.read()
filename = os.path.split(file_path)[1]
with open(os.path.join(project_name, filename), 'w') as f:
f.write(Template(content).substitute(**context))
print 'Project "{}" created'.format(project_name)
def start_scrapy_project(project_name):
"""Bootstrap a portia project with default scrapy files."""
if PY2:
project_name = encode(project_name)
files = find_files(project_name)
out_files = {}
for path, contents in files.items():
contents = string.Template(contents).substitute(
project_name=project_name,
ProjectName=string_camelcase(project_name)
)
if path.endswith('.tmpl'):
path = path[:-len('.tmpl')]
if path.endswith('scrapy.cfg'):
path = 'scrapy.cfg'
out_files[path] = contents
out_files['setup.py'] = SETUP(project_name)
return out_files
def _bbox_2_wkt(bbox, srid):
'''
Given a bbox dictionary, return a WKTSpatialElement, transformed
into the database\'s CRS if necessary.
returns e.g. WKTSpatialElement("POLYGON ((2 0, 2 1, 7 1, 7 0, 2 0))", 4326)
'''
db_srid = int(config.get('ckan.spatial.srid', '4326'))
bbox_template = Template('POLYGON (($minx $miny, $minx $maxy, $maxx $maxy, $maxx $miny, $minx $miny))')
wkt = bbox_template.substitute(minx=bbox['minx'],
miny=bbox['miny'],
maxx=bbox['maxx'],
maxy=bbox['maxy'])
if srid and srid != db_srid:
# Input geometry needs to be transformed to the one used on the database
input_geometry = ST_Transform(WKTElement(wkt,srid),db_srid)
else:
input_geometry = WKTElement(wkt,db_srid)
return input_geometry
def show_kernel_error(self, error):
"""Show kernel initialization errors."""
# Remove unneeded blank lines at the beginning
eol = sourcecode.get_eol_chars(error)
if eol:
error = error.replace(eol, '<br>')
# Don't break lines in hyphens
# From http://stackoverflow.com/q/7691569/438386
error = error.replace('-', '‑')
message = _("An error occurred while starting the kernel")
kernel_error_template = Template(KERNEL_ERROR)
page = kernel_error_template.substitute(css_path=CSS_PATH,
message=message,
error=error)
self.setHtml(page)
def authenticate(self, username, password):
server = settings.get('authentication.config.server')
port = settings.get('authentication.config.port')
bind_user = settings.get('authentication.config.bind_user')
bind_password = settings.get('authentication.config.bind_password')
query = Template(settings.get('authentication.config.user_query'))
with simpleldap.Connection(server, port, bind_user, bind_password) as conn:
try:
user = conn.get(query.substitute(username=username))
except simpleldap.ObjectNotFound:
return None
with simpleldap.Connection(server, port) as conn:
if conn.authenticate(user.dn, password):
return User(
username=username,
name=user.first('cn'),
groups=[self._split_ldap_spec(x)['CN'] for x in user.get('memberof', [])]
)
return None
def execute_for_backends(self, cmd, pxname, svname, wait_for_status = None):
"""
Run some command on the specified backends. If no backends are provided they will
be discovered automatically (all backends)
"""
# Discover backends if none are given
if pxname is None:
backends = self.discover_all_backends()
else:
backends = [pxname]
# Run the command for each requested backend
for backend in backends:
# Fail when backends were not found
state = self.get_state_for(backend, svname)
if (self.fail_on_not_found or self.wait) and state is None:
self.module.fail_json(msg="The specified backend '%s/%s' was not found!" % (backend, svname))
self.execute(Template(cmd).substitute(pxname = backend, svname = svname))
if self.wait:
self.wait_until_status(backend, svname, wait_for_status)
def create_capability_files(template_path, themes_path, map_path, fonts_path, use_debug, shapepath):
template = Template( open( os.path.join(template_path, "SeaChart_THEME.map"), 'r' ).read() )
for theme in os.listdir(themes_path):
# Remove file suffix
theme = os.path.splitext(theme)[0]
debug_string = ""
if use_debug:
debug_string = str.format(debug_template, theme)
d = get_dictionary(theme, map_path, fonts_path, debug_string)
if shapepath:
d['SHAPEPATH'] = shapepath
fileout = open( os.path.join(map_path, "SeaChart_" + theme + ".map"), 'w' )
fileout.write( template.substitute(d) )
def create_legend_files(template_path, themes_path, map_path, fonts_path, use_debug):
template = Template( open( os.path.join(template_path, "SeaChart_Legend_THEME.map"), 'r' ).read() )
for theme in os.listdir(themes_path):
# Remove file suffix
theme = os.path.splitext(theme)[0]
debug_string = ""
if use_debug:
debug_string = str.format(debug_template, theme)
d = get_dictionary(theme, map_path, fonts_path, debug_string)
legend_path = dirutils.force_sub_dir(map_path, "legends")
fileout = open( os.path.join(legend_path, "SeaChart_Legend_" + theme + ".map"), 'w' )
fileout.write( template.substitute(d) )
def write_html_link_index(out_dir, link):
with open(LINK_INDEX_TEMPLATE, 'r', encoding='utf-8') as f:
link_html = f.read()
path = os.path.join(out_dir, 'index.html')
print(' ? Updating: index.html')
with open(path, 'w', encoding='utf-8') as f:
f.write(Template(link_html).substitute({
**link,
**link['latest'],
'type': link['type'] or 'website',
'tags': link['tags'] or 'untagged',
'bookmarked': datetime.fromtimestamp(float(link['timestamp'])).strftime('%Y-%m-%d %H:%M'),
'updated': datetime.fromtimestamp(float(link['updated'])).strftime('%Y-%m-%d %H:%M'),
'archive_org': link['latest'].get('archive_org') or 'https://web.archive.org/save/{}'.format(link['url']),
'wget': link['latest'].get('wget') or link['domain'],
}))
chmod_file(path)
def execute(self, command, params):
"""
Send a command to the remote server.
Any path subtitutions required for the URL mapped to the command should be
included in the command parameters.
:Args:
- command - A string specifying the command to execute.
- params - A dictionary of named parameters to send with the command as
its JSON payload.
"""
command_info = self._commands[command]
assert command_info is not None, 'Unrecognised command %s' % command
data = utils.dump_json(params)
path = string.Template(command_info[1]).substitute(params)
url = '%s%s' % (self._url, path)
return self._request(command_info[0], url, body=data)
def test_keyword_arguments_safe(self):
eq = self.assertEqual
raises = self.assertRaises
s = Template('$who likes $what')
eq(s.safe_substitute(who='tim', what='ham'), 'tim likes ham')
eq(s.safe_substitute(dict(who='tim'), what='ham'), 'tim likes ham')
eq(s.safe_substitute(dict(who='fred', what='kung pao'),
who='tim', what='ham'),
'tim likes ham')
s = Template('the mapping is $mapping')
eq(s.safe_substitute(dict(foo='none'), mapping='bozo'),
'the mapping is bozo')
eq(s.safe_substitute(dict(mapping='one'), mapping='two'),
'the mapping is two')
d = dict(mapping='one')
raises(TypeError, s.substitute, d, {})
raises(TypeError, s.safe_substitute, d, {})