def extract_wininst_cfg(dist_filename):
"""Extract configuration data from a bdist_wininst .exe
Returns a configparser.RawConfigParser, or None
"""
f = open(dist_filename, 'rb')
try:
endrec = zipfile._EndRecData(f)
if endrec is None:
return None
prepended = (endrec[9] - endrec[5]) - endrec[6]
if prepended < 12: # no wininst data here
return None
f.seek(prepended - 12)
tag, cfglen, bmlen = struct.unpack("<iii", f.read(12))
if tag not in (0x1234567A, 0x1234567B):
return None # not a valid tag
f.seek(prepended - (12 + cfglen))
init = {'version': '', 'target_version': ''}
cfg = configparser.RawConfigParser(init)
try:
part = f.read(cfglen)
# Read up to the first null byte.
config = part.split(b'\0', 1)[0]
# Now the config is in bytes, but for RawConfigParser, it should
# be text, so decode it.
config = config.decode(sys.getfilesystemencoding())
cfg.readfp(six.StringIO(config))
except configparser.Error:
return None
if not cfg.has_section('metadata') or not cfg.has_section('Setup'):
return None
return cfg
finally:
f.close()
python类Error()的实例源码
def extract_wininst_cfg(dist_filename):
"""Extract configuration data from a bdist_wininst .exe
Returns a configparser.RawConfigParser, or None
"""
f = open(dist_filename, 'rb')
try:
endrec = zipfile._EndRecData(f)
if endrec is None:
return None
prepended = (endrec[9] - endrec[5]) - endrec[6]
if prepended < 12: # no wininst data here
return None
f.seek(prepended - 12)
tag, cfglen, bmlen = struct.unpack("<iii", f.read(12))
if tag not in (0x1234567A, 0x1234567B):
return None # not a valid tag
f.seek(prepended - (12 + cfglen))
init = {'version': '', 'target_version': ''}
cfg = configparser.RawConfigParser(init)
try:
part = f.read(cfglen)
# Read up to the first null byte.
config = part.split(b'\0', 1)[0]
# Now the config is in bytes, but for RawConfigParser, it should
# be text, so decode it.
config = config.decode(sys.getfilesystemencoding())
cfg.readfp(six.StringIO(config))
except configparser.Error:
return None
if not cfg.has_section('metadata') or not cfg.has_section('Setup'):
return None
return cfg
finally:
f.close()
def extract_wininst_cfg(dist_filename):
"""Extract configuration data from a bdist_wininst .exe
Returns a configparser.RawConfigParser, or None
"""
f = open(dist_filename, 'rb')
try:
endrec = zipfile._EndRecData(f)
if endrec is None:
return None
prepended = (endrec[9] - endrec[5]) - endrec[6]
if prepended < 12: # no wininst data here
return None
f.seek(prepended - 12)
tag, cfglen, bmlen = struct.unpack("<iii", f.read(12))
if tag not in (0x1234567A, 0x1234567B):
return None # not a valid tag
f.seek(prepended - (12 + cfglen))
init = {'version': '', 'target_version': ''}
cfg = configparser.RawConfigParser(init)
try:
part = f.read(cfglen)
# Read up to the first null byte.
config = part.split(b'\0', 1)[0]
# Now the config is in bytes, but for RawConfigParser, it should
# be text, so decode it.
config = config.decode(sys.getfilesystemencoding())
cfg.readfp(six.StringIO(config))
except configparser.Error:
return None
if not cfg.has_section('metadata') or not cfg.has_section('Setup'):
return None
return cfg
finally:
f.close()
def extract_wininst_cfg(dist_filename):
"""Extract configuration data from a bdist_wininst .exe
Returns a configparser.RawConfigParser, or None
"""
f = open(dist_filename, 'rb')
try:
endrec = zipfile._EndRecData(f)
if endrec is None:
return None
prepended = (endrec[9] - endrec[5]) - endrec[6]
if prepended < 12: # no wininst data here
return None
f.seek(prepended - 12)
tag, cfglen, bmlen = struct.unpack("<iii", f.read(12))
if tag not in (0x1234567A, 0x1234567B):
return None # not a valid tag
f.seek(prepended - (12 + cfglen))
init = {'version': '', 'target_version': ''}
cfg = configparser.RawConfigParser(init)
try:
part = f.read(cfglen)
# Read up to the first null byte.
config = part.split(b'\0', 1)[0]
# Now the config is in bytes, but for RawConfigParser, it should
# be text, so decode it.
config = config.decode(sys.getfilesystemencoding())
cfg.readfp(six.StringIO(config))
except configparser.Error:
return None
if not cfg.has_section('metadata') or not cfg.has_section('Setup'):
return None
return cfg
finally:
f.close()
def __getitem__(self, item):
try:
return self.cfg.get(self.def_sec, item)
except configparser.Error:
return None
def extract_wininst_cfg(dist_filename):
"""Extract configuration data from a bdist_wininst .exe
Returns a configparser.RawConfigParser, or None
"""
f = open(dist_filename, 'rb')
try:
endrec = zipfile._EndRecData(f)
if endrec is None:
return None
prepended = (endrec[9] - endrec[5]) - endrec[6]
if prepended < 12: # no wininst data here
return None
f.seek(prepended - 12)
tag, cfglen, bmlen = struct.unpack("<iii", f.read(12))
if tag not in (0x1234567A, 0x1234567B):
return None # not a valid tag
f.seek(prepended - (12 + cfglen))
init = {'version': '', 'target_version': ''}
cfg = configparser.RawConfigParser(init)
try:
part = f.read(cfglen)
# Read up to the first null byte.
config = part.split(b'\0', 1)[0]
# Now the config is in bytes, but for RawConfigParser, it should
# be text, so decode it.
config = config.decode(sys.getfilesystemencoding())
cfg.readfp(six.StringIO(config))
except configparser.Error:
return None
if not cfg.has_section('metadata') or not cfg.has_section('Setup'):
return None
return cfg
finally:
f.close()
def extract_wininst_cfg(dist_filename):
"""Extract configuration data from a bdist_wininst .exe
Returns a configparser.RawConfigParser, or None
"""
f = open(dist_filename, 'rb')
try:
endrec = zipfile._EndRecData(f)
if endrec is None:
return None
prepended = (endrec[9] - endrec[5]) - endrec[6]
if prepended < 12: # no wininst data here
return None
f.seek(prepended - 12)
tag, cfglen, bmlen = struct.unpack("<iii", f.read(12))
if tag not in (0x1234567A, 0x1234567B):
return None # not a valid tag
f.seek(prepended - (12 + cfglen))
init = {'version': '', 'target_version': ''}
cfg = configparser.RawConfigParser(init)
try:
part = f.read(cfglen)
# Read up to the first null byte.
config = part.split(b'\0', 1)[0]
# Now the config is in bytes, but for RawConfigParser, it should
# be text, so decode it.
config = config.decode(sys.getfilesystemencoding())
cfg.readfp(six.StringIO(config))
except configparser.Error:
return None
if not cfg.has_section('metadata') or not cfg.has_section('Setup'):
return None
return cfg
finally:
f.close()
def init_logger(name):
"""Initialize logger."""
log_conf_file = pkg_resources.resource_stream(
'treadmill',
'/logging/{name}'.format(name=name)
)
try:
logging.config.fileConfig(
codecs.getreader('utf-8')(log_conf_file)
)
except configparser.Error:
with tempfile.NamedTemporaryFile(delete=False) as f:
traceback.print_exc(file=f)
click.echo('Error parsing log conf: {name}'.format(name=name),
err=True)
def extract_wininst_cfg(dist_filename):
"""Extract configuration data from a bdist_wininst .exe
Returns a configparser.RawConfigParser, or None
"""
f = open(dist_filename, 'rb')
try:
endrec = zipfile._EndRecData(f)
if endrec is None:
return None
prepended = (endrec[9] - endrec[5]) - endrec[6]
if prepended < 12: # no wininst data here
return None
f.seek(prepended - 12)
tag, cfglen, bmlen = struct.unpack("<iii", f.read(12))
if tag not in (0x1234567A, 0x1234567B):
return None # not a valid tag
f.seek(prepended - (12 + cfglen))
init = {'version': '', 'target_version': ''}
cfg = configparser.RawConfigParser(init)
try:
part = f.read(cfglen)
# Read up to the first null byte.
config = part.split(b'\0', 1)[0]
# Now the config is in bytes, but for RawConfigParser, it should
# be text, so decode it.
config = config.decode(sys.getfilesystemencoding())
cfg.readfp(six.StringIO(config))
except configparser.Error:
return None
if not cfg.has_section('metadata') or not cfg.has_section('Setup'):
return None
return cfg
finally:
f.close()
def extract_wininst_cfg(dist_filename):
"""Extract configuration data from a bdist_wininst .exe
Returns a configparser.RawConfigParser, or None
"""
f = open(dist_filename, 'rb')
try:
endrec = zipfile._EndRecData(f)
if endrec is None:
return None
prepended = (endrec[9] - endrec[5]) - endrec[6]
if prepended < 12: # no wininst data here
return None
f.seek(prepended - 12)
tag, cfglen, bmlen = struct.unpack("<iii", f.read(12))
if tag not in (0x1234567A, 0x1234567B):
return None # not a valid tag
f.seek(prepended - (12 + cfglen))
init = {'version': '', 'target_version': ''}
cfg = configparser.RawConfigParser(init)
try:
part = f.read(cfglen)
# Read up to the first null byte.
config = part.split(b'\0', 1)[0]
# Now the config is in bytes, but for RawConfigParser, it should
# be text, so decode it.
config = config.decode(sys.getfilesystemencoding())
cfg.readfp(six.StringIO(config))
except configparser.Error:
return None
if not cfg.has_section('metadata') or not cfg.has_section('Setup'):
return None
return cfg
finally:
f.close()
def raw_config_parse(config_filename):
"""Returns the parsed INI config contents.
Each section name is a top level key.
:returns: A dict with keys for each profile found in the config
file and the value of each key being a dict containing name
value pairs found in that profile.
:raises: ConfigNotFound, ConfigParseError
"""
config = {}
path = config_filename
if path is not None:
path = os.path.expandvars(path)
path = os.path.expanduser(path)
if not os.path.isfile(path):
raise kscore.exceptions.ConfigNotFound(path=path)
cp = configparser.RawConfigParser()
try:
cp.read(path)
except configparser.Error:
raise kscore.exceptions.ConfigParseError(path=path)
else:
for section in cp.sections():
config[section] = {}
for option in cp.options(section):
config_value = cp.get(section, option)
if config_value.startswith('\n'):
# Then we need to parse the inner contents as
# hierarchical. We support a single level
# of nesting for now.
try:
config_value = _parse_nested(config_value)
except ValueError:
raise kscore.exceptions.ConfigParseError(
path=path)
config[section][option] = config_value
return config
def extract_wininst_cfg(dist_filename):
"""Extract configuration data from a bdist_wininst .exe
Returns a configparser.RawConfigParser, or None
"""
f = open(dist_filename, 'rb')
try:
endrec = zipfile._EndRecData(f)
if endrec is None:
return None
prepended = (endrec[9] - endrec[5]) - endrec[6]
if prepended < 12: # no wininst data here
return None
f.seek(prepended - 12)
tag, cfglen, bmlen = struct.unpack("<iii", f.read(12))
if tag not in (0x1234567A, 0x1234567B):
return None # not a valid tag
f.seek(prepended - (12 + cfglen))
init = {'version': '', 'target_version': ''}
cfg = configparser.RawConfigParser(init)
try:
part = f.read(cfglen)
# Read up to the first null byte.
config = part.split(b'\0', 1)[0]
# Now the config is in bytes, but for RawConfigParser, it should
# be text, so decode it.
config = config.decode(sys.getfilesystemencoding())
cfg.readfp(six.StringIO(config))
except configparser.Error:
return None
if not cfg.has_section('metadata') or not cfg.has_section('Setup'):
return None
return cfg
finally:
f.close()
def extract_wininst_cfg(dist_filename):
"""Extract configuration data from a bdist_wininst .exe
Returns a configparser.RawConfigParser, or None
"""
f = open(dist_filename, 'rb')
try:
endrec = zipfile._EndRecData(f)
if endrec is None:
return None
prepended = (endrec[9] - endrec[5]) - endrec[6]
if prepended < 12: # no wininst data here
return None
f.seek(prepended - 12)
tag, cfglen, bmlen = struct.unpack("<iii", f.read(12))
if tag not in (0x1234567A, 0x1234567B):
return None # not a valid tag
f.seek(prepended - (12 + cfglen))
init = {'version': '', 'target_version': ''}
cfg = configparser.RawConfigParser(init)
try:
part = f.read(cfglen)
# Read up to the first null byte.
config = part.split(b'\0', 1)[0]
# Now the config is in bytes, but for RawConfigParser, it should
# be text, so decode it.
config = config.decode(sys.getfilesystemencoding())
cfg.readfp(six.StringIO(config))
except configparser.Error:
return None
if not cfg.has_section('metadata') or not cfg.has_section('Setup'):
return None
return cfg
finally:
f.close()
def extract_wininst_cfg(dist_filename):
"""Extract configuration data from a bdist_wininst .exe
Returns a configparser.RawConfigParser, or None
"""
f = open(dist_filename, 'rb')
try:
endrec = zipfile._EndRecData(f)
if endrec is None:
return None
prepended = (endrec[9] - endrec[5]) - endrec[6]
if prepended < 12: # no wininst data here
return None
f.seek(prepended - 12)
tag, cfglen, bmlen = struct.unpack("<iii", f.read(12))
if tag not in (0x1234567A, 0x1234567B):
return None # not a valid tag
f.seek(prepended - (12 + cfglen))
init = {'version': '', 'target_version': ''}
cfg = configparser.RawConfigParser(init)
try:
part = f.read(cfglen)
# Read up to the first null byte.
config = part.split(b'\0', 1)[0]
# Now the config is in bytes, but for RawConfigParser, it should
# be text, so decode it.
config = config.decode(sys.getfilesystemencoding())
cfg.readfp(six.StringIO(config))
except configparser.Error:
return None
if not cfg.has_section('metadata') or not cfg.has_section('Setup'):
return None
return cfg
finally:
f.close()
def extract_wininst_cfg(dist_filename):
"""Extract configuration data from a bdist_wininst .exe
Returns a configparser.RawConfigParser, or None
"""
f = open(dist_filename, 'rb')
try:
endrec = zipfile._EndRecData(f)
if endrec is None:
return None
prepended = (endrec[9] - endrec[5]) - endrec[6]
if prepended < 12: # no wininst data here
return None
f.seek(prepended - 12)
tag, cfglen, bmlen = struct.unpack("<iii", f.read(12))
if tag not in (0x1234567A, 0x1234567B):
return None # not a valid tag
f.seek(prepended - (12 + cfglen))
init = {'version': '', 'target_version': ''}
cfg = configparser.RawConfigParser(init)
try:
part = f.read(cfglen)
# Read up to the first null byte.
config = part.split(b'\0', 1)[0]
# Now the config is in bytes, but for RawConfigParser, it should
# be text, so decode it.
config = config.decode(sys.getfilesystemencoding())
cfg.readfp(six.StringIO(config))
except configparser.Error:
return None
if not cfg.has_section('metadata') or not cfg.has_section('Setup'):
return None
return cfg
finally:
f.close()
def extract_wininst_cfg(dist_filename):
"""Extract configuration data from a bdist_wininst .exe
Returns a configparser.RawConfigParser, or None
"""
f = open(dist_filename, 'rb')
try:
endrec = zipfile._EndRecData(f)
if endrec is None:
return None
prepended = (endrec[9] - endrec[5]) - endrec[6]
if prepended < 12: # no wininst data here
return None
f.seek(prepended - 12)
tag, cfglen, bmlen = struct.unpack("<iii", f.read(12))
if tag not in (0x1234567A, 0x1234567B):
return None # not a valid tag
f.seek(prepended - (12 + cfglen))
init = {'version': '', 'target_version': ''}
cfg = configparser.RawConfigParser(init)
try:
part = f.read(cfglen)
# Read up to the first null byte.
config = part.split(b'\0', 1)[0]
# Now the config is in bytes, but for RawConfigParser, it should
# be text, so decode it.
config = config.decode(sys.getfilesystemencoding())
cfg.readfp(six.StringIO(config))
except configparser.Error:
return None
if not cfg.has_section('metadata') or not cfg.has_section('Setup'):
return None
return cfg
finally:
f.close()
def extract_wininst_cfg(dist_filename):
"""Extract configuration data from a bdist_wininst .exe
Returns a configparser.RawConfigParser, or None
"""
f = open(dist_filename, 'rb')
try:
endrec = zipfile._EndRecData(f)
if endrec is None:
return None
prepended = (endrec[9] - endrec[5]) - endrec[6]
if prepended < 12: # no wininst data here
return None
f.seek(prepended - 12)
tag, cfglen, bmlen = struct.unpack("<iii", f.read(12))
if tag not in (0x1234567A, 0x1234567B):
return None # not a valid tag
f.seek(prepended - (12 + cfglen))
init = {'version': '', 'target_version': ''}
cfg = configparser.RawConfigParser(init)
try:
part = f.read(cfglen)
# Read up to the first null byte.
config = part.split(b'\0', 1)[0]
# Now the config is in bytes, but for RawConfigParser, it should
# be text, so decode it.
config = config.decode(sys.getfilesystemencoding())
cfg.readfp(six.StringIO(config))
except configparser.Error:
return None
if not cfg.has_section('metadata') or not cfg.has_section('Setup'):
return None
return cfg
finally:
f.close()
def extract_wininst_cfg(dist_filename):
"""Extract configuration data from a bdist_wininst .exe
Returns a configparser.RawConfigParser, or None
"""
f = open(dist_filename, 'rb')
try:
endrec = zipfile._EndRecData(f)
if endrec is None:
return None
prepended = (endrec[9] - endrec[5]) - endrec[6]
if prepended < 12: # no wininst data here
return None
f.seek(prepended - 12)
tag, cfglen, bmlen = struct.unpack("<iii", f.read(12))
if tag not in (0x1234567A, 0x1234567B):
return None # not a valid tag
f.seek(prepended - (12 + cfglen))
init = {'version': '', 'target_version': ''}
cfg = configparser.RawConfigParser(init)
try:
part = f.read(cfglen)
# Read up to the first null byte.
config = part.split(b'\0', 1)[0]
# Now the config is in bytes, but for RawConfigParser, it should
# be text, so decode it.
config = config.decode(sys.getfilesystemencoding())
cfg.readfp(six.StringIO(config))
except configparser.Error:
return None
if not cfg.has_section('metadata') or not cfg.has_section('Setup'):
return None
return cfg
finally:
f.close()
def __init__(self, config_file=None):
if config_file:
try:
# ConfigParser doesn't do a good job of
# error reporting, so we'll just try to open
# the file
open(config_file, "r").close()
except (EnvironmentError) as err:
raise PkglintConfigException(
_("unable to read config file: {0} ").format(
err))
try:
if six.PY2:
self.config = configparser.SafeConfigParser(
defaults)
else:
# SafeConfigParser has been renamed to
# ConfigParser in Python 3.2.
self.config = configparser.ConfigParser(
defaults)
if not config_file:
if six.PY2:
self.config.readfp(
open("/usr/share/lib/pkg/pkglintrc"))
else:
self.config.read_file(
open("/usr/share/lib/pkg/pkglintrc"))
self.config.read(
[os.path.expanduser("~/.pkglintrc")])
else:
self.config.read(config_file)
# sanity check our config by looking for a known key
self.config.get("pkglint", "log_level")
except configparser.Error as err:
raise PkglintConfigException(
_("missing or corrupt pkglintrc file "
"{config_file}: {err}").format(**locals()))
def extract_wininst_cfg(dist_filename):
"""Extract configuration data from a bdist_wininst .exe
Returns a configparser.RawConfigParser, or None
"""
f = open(dist_filename, 'rb')
try:
endrec = zipfile._EndRecData(f)
if endrec is None:
return None
prepended = (endrec[9] - endrec[5]) - endrec[6]
if prepended < 12: # no wininst data here
return None
f.seek(prepended - 12)
tag, cfglen, bmlen = struct.unpack("<iii", f.read(12))
if tag not in (0x1234567A, 0x1234567B):
return None # not a valid tag
f.seek(prepended - (12 + cfglen))
init = {'version': '', 'target_version': ''}
cfg = configparser.RawConfigParser(init)
try:
part = f.read(cfglen)
# Read up to the first null byte.
config = part.split(b'\0', 1)[0]
# Now the config is in bytes, but for RawConfigParser, it should
# be text, so decode it.
config = config.decode(sys.getfilesystemencoding())
cfg.readfp(six.StringIO(config))
except configparser.Error:
return None
if not cfg.has_section('metadata') or not cfg.has_section('Setup'):
return None
return cfg
finally:
f.close()
def extract_wininst_cfg(dist_filename):
"""Extract configuration data from a bdist_wininst .exe
Returns a configparser.RawConfigParser, or None
"""
f = open(dist_filename, 'rb')
try:
endrec = zipfile._EndRecData(f)
if endrec is None:
return None
prepended = (endrec[9] - endrec[5]) - endrec[6]
if prepended < 12: # no wininst data here
return None
f.seek(prepended - 12)
tag, cfglen, bmlen = struct.unpack("<iii", f.read(12))
if tag not in (0x1234567A, 0x1234567B):
return None # not a valid tag
f.seek(prepended - (12 + cfglen))
init = {'version': '', 'target_version': ''}
cfg = configparser.RawConfigParser(init)
try:
part = f.read(cfglen)
# Read up to the first null byte.
config = part.split(b'\0', 1)[0]
# Now the config is in bytes, but for RawConfigParser, it should
# be text, so decode it.
config = config.decode(sys.getfilesystemencoding())
cfg.readfp(six.StringIO(config))
except configparser.Error:
return None
if not cfg.has_section('metadata') or not cfg.has_section('Setup'):
return None
return cfg
finally:
f.close()
def __init__(self, client):
# type: (Client) -> None
self._client = client
self.marshal = lambda obj: json.dumps(obj)
self.demarshal = lambda obj: json.loads(obj)
response = self._client.get_storage()
if response['result'] == 'success':
self.state_ = response['storage']
else:
raise StateHandlerError("Error initializing state: {}".format(str(response)))
def put(self, key, value):
# type: (Text, Any) -> None
self.state_[key] = self.marshal(value)
response = self._client.update_storage({'storage': {key: self.state_[key]}})
if response['result'] != 'success':
raise StateHandlerError("Error updating state: {}".format(str(response)))
def raw_config_parse(config_filename, parse_subsections=True):
"""Returns the parsed INI config contents.
Each section name is a top level key.
:param config_filename: The name of the INI file to parse
:param parse_subsections: If True, parse indented blocks as
subsections that represent their own configuration dictionary.
For example, if the config file had the contents::
s3 =
signature_version = s3v4
addressing_style = path
The resulting ``raw_config_parse`` would be::
{'s3': {'signature_version': 's3v4', 'addressing_style': 'path'}}
If False, do not try to parse subsections and return the indented
block as its literal value::
{'s3': '\nsignature_version = s3v4\naddressing_style = path'}
:returns: A dict with keys for each profile found in the config
file and the value of each key being a dict containing name
value pairs found in that profile.
:raises: ConfigNotFound, ConfigParseError
"""
config = {}
path = config_filename
if path is not None:
path = os.path.expandvars(path)
path = os.path.expanduser(path)
if not os.path.isfile(path):
raise botocore.exceptions.ConfigNotFound(path=path)
cp = configparser.RawConfigParser()
try:
cp.read(path)
except configparser.Error:
raise botocore.exceptions.ConfigParseError(path=path)
else:
for section in cp.sections():
config[section] = {}
for option in cp.options(section):
config_value = cp.get(section, option)
if parse_subsections and config_value.startswith('\n'):
# Then we need to parse the inner contents as
# hierarchical. We support a single level
# of nesting for now.
try:
config_value = _parse_nested(config_value)
except ValueError:
raise botocore.exceptions.ConfigParseError(
path=path)
config[section][option] = config_value
return config
def raw_config_parse(config_filename, parse_subsections=True):
"""Returns the parsed INI config contents.
Each section name is a top level key.
:param config_filename: The name of the INI file to parse
:param parse_subsections: If True, parse indented blocks as
subsections that represent their own configuration dictionary.
For example, if the config file had the contents::
s3 =
signature_version = s3v4
addressing_style = path
The resulting ``raw_config_parse`` would be::
{'s3': {'signature_version': 's3v4', 'addressing_style': 'path'}}
If False, do not try to parse subsections and return the indented
block as its literal value::
{'s3': '\nsignature_version = s3v4\naddressing_style = path'}
:returns: A dict with keys for each profile found in the config
file and the value of each key being a dict containing name
value pairs found in that profile.
:raises: ConfigNotFound, ConfigParseError
"""
config = {}
path = config_filename
if path is not None:
path = os.path.expandvars(path)
path = os.path.expanduser(path)
if not os.path.isfile(path):
raise botocore.exceptions.ConfigNotFound(path=path)
cp = configparser.RawConfigParser()
try:
cp.read(path)
except configparser.Error:
raise botocore.exceptions.ConfigParseError(path=path)
else:
for section in cp.sections():
config[section] = {}
for option in cp.options(section):
config_value = cp.get(section, option)
if parse_subsections and config_value.startswith('\n'):
# Then we need to parse the inner contents as
# hierarchical. We support a single level
# of nesting for now.
try:
config_value = _parse_nested(config_value)
except ValueError:
raise botocore.exceptions.ConfigParseError(
path=path)
config[section][option] = config_value
return config
def handle_exceptions(exclist):
"""Decorator that will handle exceptions and output friendly messages."""
def wrap(f):
"""Returns decorator that wraps/handles exceptions."""
exclist_copy = copy.copy(exclist)
@functools.wraps(f)
def wrapped_f(*args, **kwargs):
"""Wrapped function."""
if not exclist_copy:
f(*args, **kwargs)
else:
exc, handler = exclist_copy.pop(0)
try:
wrapped_f(*args, **kwargs)
except exc as err:
if isinstance(handler, (six.text_type, six.string_types)):
click.echo(handler, err=True)
elif handler is None:
click.echo(str(err), err=True)
else:
click.echo(handler(err), err=True)
sys.exit(EXIT_CODE_DEFAULT)
@functools.wraps(f)
def _handle_any(*args, **kwargs):
"""Default exception handler."""
try:
return wrapped_f(*args, **kwargs)
except click.UsageError as usage_err:
click.echo('Usage error: %s' % str(usage_err), err=True)
sys.exit(EXIT_CODE_DEFAULT)
except Exception as unhandled: # pylint: disable=W0703
with tempfile.NamedTemporaryFile(delete=False, mode='w') as f:
traceback.print_exc(file=f)
click.echo('Error: %s [ %s ]' % (unhandled, f.name),
err=True)
sys.exit(EXIT_CODE_DEFAULT)
return _handle_any
return wrap
def handle_cli_exceptions(exclist):
"""Decorator that will handle exceptions and output friendly messages."""
def wrap(f):
"""Returns decorator that wraps/handles exceptions."""
exclist_copy = copy.copy(exclist)
@functools.wraps(f)
def wrapped_f(*args, **kwargs):
"""Wrapped function."""
if not exclist_copy:
f(*args, **kwargs)
else:
exc, handler = exclist_copy.pop(0)
try:
wrapped_f(*args, **kwargs)
except exc as err:
if handler is None:
raise click.UsageError(
err.response['Error']['Message']
)
elif isinstance(handler, str):
click.echo(err, err=True)
sys.exit(EXIT_CODE_DEFAULT)
@functools.wraps(f)
def _handle_any(*args, **kwargs):
"""Default exception handler."""
try:
return wrapped_f(*args, **kwargs)
except Exception as unhandled: # pylint: disable=W0703
with tempfile.NamedTemporaryFile(delete=False, mode='w') as f:
traceback.print_exc(file=f)
click.echo('Error: %s [ %s ]' % (unhandled, f.name),
err=True)
sys.exit(EXIT_CODE_DEFAULT)
return _handle_any
return wrap
def raw_config_parse(config_filename, parse_subsections=True):
"""Returns the parsed INI config contents.
Each section name is a top level key.
:param config_filename: The name of the INI file to parse
:param parse_subsections: If True, parse indented blocks as
subsections that represent their own configuration dictionary.
For example, if the config file had the contents::
s3 =
signature_version = s3v4
addressing_style = path
The resulting ``raw_config_parse`` would be::
{'s3': {'signature_version': 's3v4', 'addressing_style': 'path'}}
If False, do not try to parse subsections and return the indented
block as its literal value::
{'s3': '\nsignature_version = s3v4\naddressing_style = path'}
:returns: A dict with keys for each profile found in the config
file and the value of each key being a dict containing name
value pairs found in that profile.
:raises: ConfigNotFound, ConfigParseError
"""
config = {}
path = config_filename
if path is not None:
path = os.path.expandvars(path)
path = os.path.expanduser(path)
if not os.path.isfile(path):
raise botocore.exceptions.ConfigNotFound(path=path)
cp = configparser.RawConfigParser()
try:
cp.read(path)
except configparser.Error:
raise botocore.exceptions.ConfigParseError(path=path)
else:
for section in cp.sections():
config[section] = {}
for option in cp.options(section):
config_value = cp.get(section, option)
if parse_subsections and config_value.startswith('\n'):
# Then we need to parse the inner contents as
# hierarchical. We support a single level
# of nesting for now.
try:
config_value = _parse_nested(config_value)
except ValueError:
raise botocore.exceptions.ConfigParseError(
path=path)
config[section][option] = config_value
return config
def get_config_info(self, bot_name, optional=False):
# type: (str, Optional[bool]) -> Dict[str, Any]
if self.bot_config_file is None:
if optional:
return dict()
# Well written bots should catch this exception
# and provide nice error messages with instructions
# on setting up the configuration specfic to this bot.
# And then `run.py` should also catch exceptions on how
# to specify the file in the command line.
raise NoBotConfigException(bot_name)
if bot_name not in self.bot_config_file:
print('''
WARNING!
{} does not adhere to the
file naming convention, and it could be a
sign that you passed in the
wrong third-party configuration file.
The suggested name is {}.conf
We will proceed anyway.
'''.format(self.bot_config_file, bot_name))
# We expect the caller to pass in None if the user does
# not specify a bot_config_file. If they pass in a bogus
# filename, we'll let an IOError happen here. Callers
# like `run.py` will do the command line parsing and checking
# for the existence of the file.
config = configparser.ConfigParser()
with open(self.bot_config_file) as conf:
try:
config.readfp(conf) # type: ignore # readfp->read_file in python 3, so not in stubs
except configparser.Error as e:
display_config_file_errors(str(e), self.bot_config_file)
sys.exit(1)
return dict(config.items(bot_name))
configloader.py 文件源码
项目:tf_aws_ecs_instance_draining_on_scale_in
作者: terraform-community-modules
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def raw_config_parse(config_filename, parse_subsections=True):
"""Returns the parsed INI config contents.
Each section name is a top level key.
:param config_filename: The name of the INI file to parse
:param parse_subsections: If True, parse indented blocks as
subsections that represent their own configuration dictionary.
For example, if the config file had the contents::
s3 =
signature_version = s3v4
addressing_style = path
The resulting ``raw_config_parse`` would be::
{'s3': {'signature_version': 's3v4', 'addressing_style': 'path'}}
If False, do not try to parse subsections and return the indented
block as its literal value::
{'s3': '\nsignature_version = s3v4\naddressing_style = path'}
:returns: A dict with keys for each profile found in the config
file and the value of each key being a dict containing name
value pairs found in that profile.
:raises: ConfigNotFound, ConfigParseError
"""
config = {}
path = config_filename
if path is not None:
path = os.path.expandvars(path)
path = os.path.expanduser(path)
if not os.path.isfile(path):
raise botocore.exceptions.ConfigNotFound(path=path)
cp = configparser.RawConfigParser()
try:
cp.read(path)
except configparser.Error:
raise botocore.exceptions.ConfigParseError(path=path)
else:
for section in cp.sections():
config[section] = {}
for option in cp.options(section):
config_value = cp.get(section, option)
if parse_subsections and config_value.startswith('\n'):
# Then we need to parse the inner contents as
# hierarchical. We support a single level
# of nesting for now.
try:
config_value = _parse_nested(config_value)
except ValueError:
raise botocore.exceptions.ConfigParseError(
path=path)
config[section][option] = config_value
return config