def __init__(self, capture_exc_info=False):
# type: (bool) -> None
"""
:param capture_exc_info:
Whether to capture `sys.exc_info` when an handling an
exception.
This is turned off by default to reduce memory usage, but
it is useful in certain cases (e.g., if you want to send
exceptions to a logger that expect exc_info).
Regardless, you can still check ``self.has_exceptions`` to
see if an exception occurred.
"""
super(MemoryHandler, self).__init__()
self.messages = OrderedDict() # type: Union[OrderedDict, Dict[Text, List[FilterMessage]]]
self.has_exceptions = False
self.capture_exc_info = capture_exc_info
self.exc_info = [] # type: List[Tuple[type, Exception, TracebackType]]
python类Text()的实例源码
def get_errors(self, with_context=False):
# type: (bool) -> Dict[Text, List[Dict[Text, Text]]]
"""
Returns a dict of error messages generated by the Filter, in a
format suitable for inclusion in e.g., an API 400 response
payload.
:param with_context:
Whether to include the context object in the result (for
debugging purposes).
Note: context is usually not safe to expose to end users!
"""
return {
key: [m.as_dict(with_context) for m in messages]
for key, messages in iteritems(self.filter_messages)
}
def is_filter_type(target):
# type: (Any) -> Union[bool, Text]
"""
Returns whether the specified object can be registered as a filter.
:return:
Returns ``True`` if the object is a filter.
Otherwise, returns a string indicating why it is not valid.
"""
if not is_class(target):
return 'not a class'
if not issubclass(target, BaseFilter):
return 'does not extend BaseFilter'
if is_abstract(target):
return 'abstract class'
return True
def __init__(self, pattern, keys=None):
# type: (Union[Text, regex._pattern_type, re._pattern_type], Optional[Sequence[Text]]) -> None
"""
:param pattern:
Regex used to split incoming string values.
IMPORTANT: If you specify your own compiled regex, be sure
to add the ``UNICODE`` flag for Unicode support!
:param keys:
If set, the resulting list will be converted into an
OrderedDict, using the specified keys.
IMPORTANT: If ``keys`` is set, the split value's length
must be less than or equal to ``len(keys)``.
"""
super(Split, self).__init__()
self.regex = (
pattern
if isinstance(pattern, (regex._pattern_type, re._pattern_type))
else regex.compile(pattern, regex.UNICODE)
)
self.keys = keys
def __init__(self, encoding='utf-8', normalize=False):
# type: (Text, bool) -> None
"""
:param encoding:
Used to decode non-unicode values.
:param normalize:
Whether to normalize the unicode value before converting
back into bytes:
- Convert to NFC form.
- Remove non-printable characters.
- Convert all line endings to unix-style ('\n').
Note that ``normalize`` is ``False`` by default for
:py:class:`ByteString`, but ``True`` by default for
:py:class:`Unicode`.
"""
super(ByteString, self).__init__(encoding, normalize)
# noinspection SpellCheckingInspection
def add_route(self, command, adapter):
# type: (Text, AdapterSpec) -> RoutingWrapper
"""
Adds a route to the wrapper.
:param command:
The name of the command to route (e.g., "attachToTangle").
:param adapter:
The adapter object or URI to route requests to.
"""
if not isinstance(adapter, BaseAdapter):
try:
adapter = self.adapter_aliases[adapter]
except KeyError:
self.adapter_aliases[adapter] = adapter = resolve_adapter(adapter)
self.routes[command] = adapter
return self
def normalise_response_json(self, data):
# type: (Dict[Text, Any]) -> List[Dict[Text, Any]]
"""Transform data to wit.ai format."""
entities = {}
for entity in data["entities"]:
entities[entity["entity"]] = {
"confidence": None,
"type": "value",
"value": entity["value"],
"start": entity["start"],
"end": entity["end"]
}
return [
{
"_text": data["text"],
"confidence": data["intent"]['confidence'],
"intent": data["intent"]['name'],
"entities": entities
}
]
def normalise_response_json(self, data):
# type: (Dict[Text, Any]) -> Dict[Text, Any]
"""Transform data to luis.ai format."""
top_intent = self._top_intent(data)
ranking = self._ranking(data)
return {
"query": data["text"],
"topScoringIntent": top_intent,
"intents": ranking,
"entities": [
{
"entity": e["value"],
"type": e["entity"],
"startIndex": None,
"endIndex": None,
"score": None
} for e in data["entities"]
] if "entities" in data else []
}
def normalise_request_json(self, data):
# type: (Dict[Text, Any]) -> Dict[Text, Any]
_data = {}
_data["text"] = data["q"][0] if type(data["q"]) == list else data["q"]
if not data.get("project"):
_data["project"] = "default"
elif type(data["project"]) == list:
_data["project"] = data["project"][0]
else:
_data["project"] = data["project"]
if data.get("model"):
_data["model"] = data["model"][0] if type(data["model"]) == list else data["model"]
_data['time'] = data["time"] if "time" in data else None
return _data
def guess_format(files):
# type: (List[Text]) -> Text
"""Given a set of files, tries to guess which data format is used."""
for filename in files:
with io.open(filename, encoding="utf-8-sig") as f:
raw_data = ""
try:
raw_data = f.read()
file_data = json.loads(raw_data)
if "data" in file_data and type(file_data.get("data")) is list:
return WIT_FILE_FORMAT
elif "luis_schema_version" in file_data:
return LUIS_FILE_FORMAT
elif "supportedLanguages" in file_data:
return DIALOGFLOW_FILE_FORMAT
elif "rasa_nlu_data" in file_data:
return RASA_FILE_FORMAT
except ValueError:
if "## intent:" in raw_data:
return MARKDOWN_FILE_FORMAT
return UNK_FILE_FORMAT
def get_component_class(component_name):
# type: (Text) -> Optional[Type[Component]]
"""Resolve component name to a registered components class."""
if component_name not in registered_components:
try:
return utils.class_from_module_path(component_name)
except Exception:
raise Exception(
"Failed to find component class for '{}'. Unknown "
"component name. Check your configured pipeline and make "
"sure the mentioned component is not misspelled. If you "
"are creating your own component, make sure it is either "
"listed as part of the `component_classes` in "
"`rasa_nlu.registry.py` or is a proper name of a class "
"in a module.".format(component_name))
return registered_components[component_name]
def validate_requirements(component_names, dev_requirements_file="alt_requirements/requirements_dev.txt"):
# type: (List[Text], Text) -> None
"""Ensures that all required python packages are installed to instantiate and used the passed components."""
from rasa_nlu import registry
# Validate that all required packages are installed
failed_imports = set()
for component_name in component_names:
component_class = registry.get_component_class(component_name)
failed_imports.update(find_unavailable_packages(component_class.required_packages()))
if failed_imports: # pragma: no cover
# if available, use the development file to figure out the correct version numbers for each requirement
all_requirements = _read_dev_requirements(dev_requirements_file)
if all_requirements:
missing_requirements = [r for i in failed_imports for r in all_requirements[i]]
raise Exception("Not all required packages are installed. " +
"Failed to find the following imports {}. ".format(", ".join(failed_imports)) +
"To use this pipeline, you need to install the missing dependencies, e.g. by running:\n\t" +
"> pip install {}".format(" ".join(missing_requirements)))
else:
raise Exception("Not all required packages are installed. " +
"To use this pipeline, you need to install the missing dependencies. " +
"Please install {}".format(", ".join(failed_imports)))
def validate_arguments(pipeline, context, allow_empty_pipeline=False):
# type: (List[Component], Dict[Text, Any], bool) -> None
"""Validates a pipeline before it is run. Ensures, that all arguments are present to train the pipeline."""
# Ensure the pipeline is not empty
if not allow_empty_pipeline and len(pipeline) == 0:
raise ValueError("Can not train an empty pipeline. " +
"Make sure to specify a proper pipeline in the configuration using the `pipeline` key." +
"The `backend` configuration key is NOT supported anymore.")
provided_properties = set(context.keys())
for component in pipeline:
for r in component.requires:
if r not in provided_properties:
raise Exception("Failed to validate at component '{}'. Missing property: '{}'".format(
component.name, r))
provided_properties.update(component.provides)
def load_component(self, component_name, model_dir, model_metadata, **context):
# type: (Text, Text, Metadata, **Any) -> Component
"""Tries to retrieve a component from the cache, calls `load` to create a new component."""
from rasa_nlu import registry
from rasa_nlu.model import Metadata
try:
cached_component, cache_key = self.__get_cached_component(component_name, model_metadata)
component = registry.load_component_by_name(component_name, model_dir,
model_metadata, cached_component, **context)
if not cached_component:
# If the component wasn't in the cache, let us add it if possible
self.__add_to_cache(component, cache_key)
return component
except MissingArgumentError as e: # pragma: no cover
raise Exception("Failed to load component '{}'. {}".format(component_name, e))
def parse(self, text, time=None):
# type: (Text) -> Dict[Text, Any]
"""Parse the input text, classify it and return pipeline result.
The pipeline result usually contains intent and entities."""
if not text:
# Not all components are able to handle empty strings. So we need
# to prevent that... This default return will not contain all
# output attributes of all components, but in the end, no one should
# pass an empty string in the first place.
output = self.default_output_attributes()
output["text"] = ""
return output
message = Message(text, self.default_output_attributes(), time=time)
for component in self.pipeline:
component.process(message, **self.context)
output = self.default_output_attributes()
output.update(message.as_dict(only_output_properties=True))
return output
def do_train(config, # type: RasaNLUConfig
component_builder=None # type: Optional[ComponentBuilder]
):
# type: (...) -> Tuple[Trainer, Interpreter, Text]
"""Loads the trainer and the data and runs the training of the model."""
# Ensure we are training a model that we can save in the end
# WARN: there is still a race condition if a model with the same name is
# trained in another subprocess
trainer = Trainer(config, component_builder)
persistor = create_persistor(config)
training_data = load_data(config['data'], config['language'])
interpreter = trainer.train(training_data)
persisted_path = trainer.persist(config['path'], persistor,
config['project'],
config['fixed_model_name'])
return trainer, interpreter, persisted_path
def load(cls,
model_dir=None, # type: Text
model_metadata=None, # type: Metadata
cached_component=None, # type:Optional[DucklingExtractor]
**kwargs # type: **Any
):
# type: (...) -> DucklingExtractor
persisted = os.path.join(model_dir,
model_metadata.get("ner_duckling_persisted"))
if cached_component:
duckling = cached_component.duckling
else:
language = model_metadata.get("language")
duckling = cls.create_duckling_wrapper(language)
if os.path.isfile(persisted):
with io.open(persisted, encoding='utf-8') as f:
persisted_data = json.loads(f.read())
return DucklingExtractor(duckling, persisted_data["dimensions"])
return DucklingExtractor(duckling)
def load(cls,
model_dir=None, # type: Text
model_metadata=None, # type: Metadata
cached_component=None, # type: Optional[DucklingHTTPExtractor]
**kwargs # type: **Any
):
# type: (...) -> DucklingHTTPExtractor
persisted = os.path.join(model_dir, model_metadata.get(cls.name))
config = kwargs.get("config", {})
dimensions = None
if os.path.isfile(persisted):
with io.open(persisted, encoding='utf-8') as f:
persisted_data = simplejson.loads(f.read())
dimensions = persisted_data["dimensions"]
return DucklingHTTPExtractor(config.get("duckling_http_url"),
model_metadata.get("language"),
dimensions)
def load(cls,
model_dir, # type: Text
model_metadata, # type: Metadata
cached_component, # type: Optional[CRFEntityExtractor]
**kwargs # type: **Any
):
# type: (...) -> CRFEntityExtractor
from sklearn.externals import joblib
if model_dir and model_metadata.get("entity_extractor_crf"):
meta = model_metadata.get("entity_extractor_crf")
ent_tagger = joblib.load(os.path.join(model_dir, meta["model_file"]))
return CRFEntityExtractor(ent_tagger=ent_tagger,
entity_crf_features=meta['crf_features'],
entity_crf_BILOU_flag=meta['BILOU_flag'])
else:
return CRFEntityExtractor()
def persist(self, model_dir):
# type: (Text) -> Dict[Text, Any]
"""Persist this model into the passed directory.
Returns the metadata necessary to load the model again."""
from sklearn.externals import joblib
if self.ent_tagger:
model_file_name = os.path.join(model_dir, "crf_model.pkl")
joblib.dump(self.ent_tagger, model_file_name)
return {"entity_extractor_crf": {"model_file": "crf_model.pkl",
"crf_features": self.crf_features,
"BILOU_flag": self.BILOU_flag,
"version": 1}}
else:
return {"entity_extractor_crf": None}
def _from_json_to_crf(self, message, entity_offsets):
# type: (Message, List[Tuple[int, int, Text]]) -> List[Tuple[Text, Text, Text, Text]]
"""Takes the json examples and switches them to a format which crfsuite likes."""
from spacy.gold import GoldParse
doc = message.get("spacy_doc")
gold = GoldParse(doc, entities=entity_offsets)
ents = [l[5] for l in gold.orig_annot]
if '-' in ents:
logger.warn("Misaligned entity annotation in sentence '{}'. ".format(doc.text) +
"Make sure the start and end values of the annotated training " +
"examples end at token boundaries (e.g. don't include trailing whitespaces).")
if not self.BILOU_flag:
for i, entity in enumerate(ents):
if entity.startswith('B-') or \
entity.startswith('I-') or \
entity.startswith('U-') or \
entity.startswith('L-'):
ents[i] = entity[2:] # removes the BILOU tags
return self._from_text_to_crf(message, ents)
def __init__(self, name=None):
# type: (Optional[Text]) -> None
super(TestFilterBravo, self).__init__()
self.name = name
def __init__(self, message, context, exc_info=None):
# type: (Text, dict, Text) -> None
"""
:param exc_info: Exception traceback (if applicable).
"""
super(FilterMessage, self).__init__()
self.message = message
self.context = context
self.code = context.get('code') or message
self.exc_info = exc_info
def errors(self):
# type: () -> Dict[Text, List[Dict[Text, Text]]]
"""
Returns a dict of error messages generated by the Filter, in a
format suitable for inclusion in e.g., an API 400 response
payload.
E.g.::
{
'authToken': [
{
'code': 'not_found',
'message':
'No AuthToken found matching this value.',
},
],
'data.foobar': [
{
'code': 'unexpected',
'message': 'Unexpected key "foobar".',
},
],
# etc.
}
"""
return self.get_errors()
def filter_messages(self):
# type: () -> Dict[Text, List[FilterMessage]]
"""
Returns the raw FilterMessages that were generated by the
Filter.
"""
self.full_clean()
return self._handler.messages
def __getattr__(self, item):
# type: (Text) -> Type[BaseFilter]
return self[item]
def _get_cache(self):
# type: () -> Dict[Text, Type[BaseFilter]]
if self._cache is None:
self._cache = {}
try:
for target in iter_entry_points(self.group): # type: EntryPoint
filter_ = target.load()
ift_result = is_filter_type(filter_)
if ift_result is True:
logger.debug(
'Registering extension filter '
'{cls.__module__}.{cls.__name__} as {name}.'.format(
cls = filter_,
name = target.name,
),
)
self._cache[target.name] = filter_
else:
logger.debug(
'Using legacy extension loader for '
'{target.name} ({reason}).'.format(
reason = ift_result,
target = target,
),
)
self._cache.update(iter_filters_in(filter_))
except DeprecationWarning:
# The user has ``simplefilter('error')`` set; reset the
# cache so that the next time we try to load extension
# filters, we don't miss anything.
self._cache = None
raise
# noinspection PyTypeChecker
return self._cache
def __init__(self,
to_nearest = 1,
rounding = ROUND_HALF_UP,
result_type = DecimalType,
):
# type: (Union[int, Text, DecimalType], Text, type) -> None
"""
:param to_nearest:
The value that the filter should round to.
E.g., ``Round(1)`` rounds to the nearest whole number.
If you want to round to a float value, it is recommended
that you provide it as a string or Decimal, to avoid
floating point problems.
:param rounding:
Controls how to round values.
:param result_type:
The type of result to return.
"""
super(Round, self).__init__()
self.to_nearest = DecimalType(to_nearest)
# Rounding to negative values isn't supported.
# I'm not even sure if that concept is valid.
Min(DecimalType('0')).apply(self.to_nearest)
self.result_type = result_type
self.rounding = rounding
def ip_type(self):
# type: () -> Text
"""
Returns the IP address versions that this Filter accepts.
"""
return '/'.join(filter(None, [
'IPv4' if self.ipv4 else None,
'IPv6' if self.ipv6 else None,
]))
def __init__(self, decoder=json.loads):
# type: (Callable[Text, Any]) -> None
super(JsonDecode, self).__init__()
self.decoder = decoder