def _send_receive(self, nummsgs: int, outformat: str='json',
dataupdate: Optional[Dict[AnyStr, Any]]=None,
restart_data: bool=True) -> List[Response]:
if restart_data:
self._restart_data(outformat)
if dataupdate:
self.data.update(dataupdate)
self._add_to_buffer(nummsgs, outformat)
self.sendbuffer.seek(0)
processor, _ = get_processor_instance(
outformat,
custom_outbuffer=self.recvbuffer,
custom_inbuffer=self.sendbuffer
)
processor.process_requests(self.sendbuffer)
return self._loadResults(outformat)
python类Dict()的实例源码
def calc_rs_modality(self) -> Dict[str, float]:
modality_counter = Counter()
for i, s in enumerate(self.sentences):
chunks = []
for bnst in self.knp.parse(s).bnst_list():
chunk = Chunk(chunk_id=bnst.bnst_id,
link=bnst.parent,
description=bnst.fstring)
chunks.append(chunk)
s = "".join([chunk.description for chunk in chunks])
ms = set(re.findall("<?????-(.+?)>", s))
modality_counter += Counter(ms)
n = len(self.sentences)
return dict([(k, float(c) / n)
for k, c in modality_counter.items()])
def _load_word_freq(self, threshold: int) -> Tuple[Dict[str, int], int]:
n_total_words = 0
word_freq = {}
with open(self.rnnlm_model_path, mode='r') as f:
for line in f:
n_total_words += 1
word, freq = line.split(' ')
freq = int(freq)
if freq > threshold:
word_freq[word] = freq
else:
word_freq['<unk/>'] = word_freq.get('<unk/>', 0) + 1
return (word_freq, n_total_words)
def __init__(self, client: Session) -> None:
"""Make new NetworkManager."""
super().__init__()
self._client = client
self._requestIdToRequest: Dict[str, Request] = dict()
self._interceptionIdToRequest: Dict[str, Request] = dict()
self._extraHTTPHeaders: OrderedDict[str, str] = OrderedDict()
self._requestInterceptionEnabled = False
self._requestHashToRequestIds = Multimap()
self._requestHashToInterceptions = Multimap()
self._client.on('Network.requestWillBeSent', self._onRequestWillBeSent)
self._client.on('Network.requestIntercepted', self._onRequestIntercepted) # noqa: E501
self._client.on('Network.responseReceived', self._onResponseReceived)
self._client.on('Network.loadingFinished', self._onLoadingFinished)
self._client.on('Network.loadingFailed', self._onLoadingFailed)
def waitForNavigation(self, options: dict = None, **kwargs: Any
) -> Optional[Response]:
"""Wait navigation completes."""
if options is None:
options = dict()
options.update(kwargs)
watcher = NavigatorWatcher(self._client, self._ignoreHTTPSErrors,
options)
responses: Dict[str, Response] = dict()
listener = helper.addEventListener(
self._networkManager,
NetworkManager.Events.Response,
lambda response: responses.__setitem__(response.url, response)
)
await watcher.waitForNavigation()
helper.removeEventListeners([listener])
return responses.get(self.url)
def __init__(self, options: Dict[str, Any] = None, **kwargs: Any) -> None:
"""Make new launcher."""
self.options = options or dict()
self.options.update(kwargs)
self.chrome_args = DEFAULT_ARGS
self._tmp_user_data_dir: Optional[str] = None
self._parse_args()
if 'headless' not in self.options or self.options.get('headless'):
self.chrome_args = self.chrome_args + [
'--headless',
'--disable-gpu',
'--hide-scrollbars',
'--mute-audio',
]
if 'executablePath' in self.options:
self.exec = self.options['executablePath']
else:
if not check_chromium():
download_chromium()
self.exec = str(chromium_excutable())
self.cmd = [self.exec] + self.chrome_args
def _visibleCenter(self) -> Dict[str, int]:
center = await self.evaluate('''
element => {
if (!element.ownerDocument.contains(element))
return null;
element.scrollIntoViewIfNeeded();
let rect = element.getBoundingClientRect();
return {
x: (Math.max(rect.left, 0) + Math.min(rect.right, window.innerWidth)) / 2,
y: (Math.max(rect.top, 0) + Math.min(rect.bottom, window.innerHeight)) / 2
};
}
''') # noqa: E501
if not center:
# raise Exception('No node found for selector: ' + selector)
raise BrowserError('No node found for selector: ')
return center
def __init__(self, database, tabledesc=None, timestamp=False):
# type: (str, Optional[Dict[str, str]], bool) -> None
self.database = database
if tabledesc is None:
tabledesc = self.prefixdesc()
self.tabledesc = tabledesc
timestampsql = self._sqtimestamp if timestamp else ''
sqcreate = self._sqcreate % timestampsql
self.kv_create = sqcreate.format(**tabledesc)
self.kv_get = self._sqget.format(**tabledesc)
self.kv_mget = self._sqmget.format(**tabledesc)
self.kv_put = self._sqput.format(**tabledesc)
self.kv_delete = self._sqdelete.format(**tabledesc)
self._connection = None # type: Optional[sqlite3.Connection]
self.sqlite_limit_variable_number = 999
self.support_mget = True
def mget(self, keys):
# type: (List[str]) -> List[Optional[bytes]]
rows = []
if self.support_mget:
try:
with self.conn as conn:
for somekeys in grouper(self.sqlite_limit_variable_number, keys):
keylist = list(somekeys)
questionmarks = ','.join(['?'] * len(keylist))
sql = self.kv_mget % questionmarks
for row in conn.execute(sql, keylist):
rows.append(row)
resultdict = dict(rows) # type: Dict[str, bytes]
rget = resultdict.get
return [rget(k) for k in keys]
except sqlite3.OperationalError:
self.support_mget = False
return [self.__get(k) for k in keys]
def contains_all_options(optiongroup, parentstyle, matchvalues=False):
# type: (Dict[Any, Any], Style, bool) -> bool
"""Returns true if all options in optiongroup are present in parentstyle.
If matchvalues is True, the values in optiongroup must also match
those in the parentstyle.
"""
for optionname, value in optiongroup.items():
if optionname not in parentstyle:
return False
if isinstance(value, dict):
parent_suboptions = parentstyle[optionname]
if not contains_all_options(value, parent_suboptions, matchvalues=matchvalues):
return False
elif matchvalues:
pvalue = parentstyle.get(optionname)
if type(pvalue) != type(value):
return False
if pvalue != value:
return False
return True
def misses_to_frame(parsed_lexemes: Iterable,
terms: Dict[str, str]=None) -> pd.DataFrame:
if not terms:
terms = {}
miss_dict = collect_misses(parsed_lexemes)
misses = []
for miss in miss_dict:
low_miss = miss.lower()
miss_record = OrderedDict()
miss_record['miss'] = low_miss
miss_record['term'] = terms.get(low_miss, low_miss)
miss_record['lexemes'] = ' '.join(miss_dict[miss])
misses.append(miss_record)
miss_frame = pd.DataFrame.from_records(
misses, index='miss', columns=['miss', 'term', 'lexemes'])
return miss_frame
def get_meta(self) -> t.Generator[t.Dict, None, None]:
all_fields = self.model._meta.get_fields(
include_parents=self.include_parents, include_hidden=self.include_hidden
)
for f in all_fields:
if f.name in self.exclude:
continue
if self.fields and f.name not in self.fields:
continue
if f.name not in self.fields:
if f.concrete not in self.concrete_in:
continue
if f.auto_created not in self.auto_created_in:
continue
if f.editable not in self.editable_in:
continue
yield self.get_field_meta(f)
def update_game_state(self) -> None:
c: Color
v: Value
discards: Dict[Color, List[int]]
discards = {c: [0] * 6 for c in self.game.variant.pile_colors}
d: int
for d in self.game.discards:
ci: CardInfo = gameCards[d]
discards[ci.color][ci.value] += 1
self.nextCardPlay: Dict[Color, int]
self.nextCardPlay = {c: len(self.game.playedCards[c]) + 1
for c in self.game.variant.pile_colors}
self.maxCardPlay: Dict[Color, Value]
self.maxCardPlay = {c: Value.V5 for c in self.game.variant.pile_colors}
for c in self.game.variant.pile_colors:
for v in reversed(Value): # type: ignore
if discards[c][v] < v.num_copies:
self.maxCardPlay[c] = v
break
def received(self, type: str, resp: Dict[str, Any]) -> None:
if type == 'message':
self.message_received(resp['text'])
elif type == 'init':
raise Exception()
elif type == 'advanced':
# Fast UI Animations
pass
elif type == 'connected':
# Players connected to the game
pass
elif type == 'notify':
self.handle_notify(resp)
elif type == 'action':
self.decide_action(resp['can_clue'], resp['can_discard'])
else:
print(type, resp)
raise Exception()
def updateMaxScore(self) -> None:
maxScore: int = 0
s: Suit
for s in self.variant.pile_suits:
possible: int = 5
copies: Dict[Rank, int] = {r: 0 for r in Rank}
d: int
for d in self.discards[s]:
card: ServerCard = self.deck[d]
copies[card.rank] += 1
r: Rank
for r in reversed(Rank): # type: ignore
totalCopies: int = r.num_copies
if self.variant == Variant.OneOfEach and s == Suit.Extra:
totalCopies += 1
if copies[r] == totalCopies:
possible = r.value - 1
maxScore += possible
self.maxScore = maxScore
def _extract_positional_label_by_id(self, files: Iterable[Path]) -> Dict[str, Union[PositionalLabel, str]]:
xml_ending = ".xml"
microphone_endings = [
"_Yamaha",
"_Kinect-Beam",
"_Kinect-RAW",
"_Realtek",
"_Samson",
"_Microsoft-Kinect-Raw"
]
xml_files = [file for file in files if file.name.endswith(xml_ending) if
self.id_filter_regex.match(name_without_extension(file))]
return OrderedDict(
(name_without_extension(file) + microphone_ending,
self._extract_label_from_xml(file))
for file in xml_files
for microphone_ending in microphone_endings
if (Path(file.parent) / (name_without_extension(file) + microphone_ending + ".wav")).exists())
def _input_dictionary_for_loss_net(self, labeled_spectrogram_batch: List[LabeledSpectrogram]) -> Dict[str, ndarray]:
spectrograms = [x.z_normalized_transposed_spectrogram() for x in labeled_spectrogram_batch]
labels = [x.label for x in labeled_spectrogram_batch]
input_batch, prediction_lengths = self._input_batch_and_prediction_lengths(spectrograms)
# Sets learning phase to training to enable dropout (see backend.learning_phase documentation for more info):
training_phase_flag_tensor = array([True])
label_lengths = reshape(array([len(label) for label in labels]), (len(labeled_spectrogram_batch), 1))
return {
Wav2Letter.InputNames.input_batch: input_batch,
Wav2Letter.InputNames.prediction_lengths: self._prediction_length_batch(prediction_lengths,
batch_size=len(spectrograms)),
Wav2Letter.InputNames.label_batch: self.grapheme_encoding.encode_label_batch(labels),
Wav2Letter.InputNames.label_lengths: label_lengths,
'keras_learning_phase': training_phase_flag_tensor
}
def create_articles(pmids: Set[str], login: object, write: bool = True) -> Dict[str, str]:
"""
Given a list of pmids, make article items for each
:param pmids: list of pmids
:param login: wdi_core login instance
:param write: actually perform write
:return: map pmid -> wdid
"""
pmid_map = dict()
for pmid in pmids:
p = wdi_helpers.PubmedItem(pmid)
if write:
try:
pmid_wdid = p.get_or_create(login)
except Exception as e:
print("Error creating article pmid: {}, error: {}".format(pmid, e))
continue
pmid_map[pmid] = pmid_wdid
else:
pmid_map[pmid] = 'Q1'
return pmid_map
def __init__(self, variables: List[VariableIdentifier], lattices: Dict[Type, Type[Lattice]],
arguments: Dict[Type, Dict[str, Any]] = defaultdict(lambda: dict())):
"""Create a mapping Var -> L from each variable in Var to the corresponding element in L.
:param variables: list of program variables
:param lattices: dictionary from variable types to the corresponding lattice types
:param arguments: dictionary from variable types to arguments of the corresponding lattices
"""
super().__init__()
self._variables = variables
self._lattices = lattices
self._arguments = arguments
try:
self._store = {v: lattices[type(v.typ)](**arguments[type(v.typ)]) for v in variables}
except KeyError as key:
error = f"Missing lattice for variable type {repr(key.args[0])}!"
raise ValueError(error)
def preprocess_samples(
self,
states: List[Dict[str, float]],
actions: List[Dict[str, float]],
rewards: List[float],
next_states: List[Dict[str, float]],
next_actions: List[Dict[str, float]],
is_terminals: List[bool],
possible_next_actions: List[List[Dict[str, float]]],
reward_timelines: List[Dict[int, float]],
) -> TrainingDataPage:
tdp = GridworldContinuous.preprocess_samples(
self, states, actions, rewards, next_states, next_actions,
is_terminals, possible_next_actions, reward_timelines
)
tdp.states = np.where(tdp.states == 1.0)[1].reshape(-1, 1
).astype(np.float32)
tdp.next_states = np.where(tdp.next_states == 1.0)[1].reshape(
-1, 1
).astype(np.float32)
return tdp
def preprocess_samples(
self,
states: List[Dict[str, float]],
actions: List[str],
rewards: List[float],
next_states: List[Dict[str, float]],
next_actions: List[str],
is_terminals: List[bool],
possible_next_actions: List[List[str]],
reward_timelines: Optional[List[Dict[int, float]]],
) -> TrainingDataPage:
tdp = self.preprocess_samples_discrete(
states, actions, rewards, next_states, next_actions, is_terminals,
possible_next_actions, reward_timelines
)
tdp.states = np.where(tdp.states == 1.0)[1].reshape(-1, 1
).astype(np.float32)
tdp.next_states = np.where(tdp.next_states == 1.0)[1].reshape(
-1, 1
).astype(np.float32)
return tdp
def to_dict(self) -> Dict[str, str or float or Dict[str, str or int]]:
"""
:return: The message data as a dictionary which can be used
to store a message as a JSON file
"""
sender_group = \
None if self.sender_group is None else self.sender_group.to_dict()
return {
"message_title": self.message_title,
"message_body": self.message_body,
"sender": self.sender.to_dict(),
"sender_group": sender_group,
"receiver": self.receiver.to_dict(),
"timestamp": self.timestamp,
}
def from_dict(data: Dict[str, str or float or Dict[str, str or int]])\
-> Message:
"""
Generates a Message object from a dictionary
:param data: The dictionary to turn into a Message
:return: The generates Message object
"""
if data["sender_group"] is None:
sender_group = None
else:
sender_group = contact_from_dict(data["sender_group"])
return Message(
data["message_title"],
data["message_body"],
contact_from_dict(data["receiver"]),
contact_from_dict(data["sender"]),
sender_group,
data["timestamp"]
)
def test_importing(self):
"""
Tests the String import handler method
:return: None
"""
os_import = \
self.config_handler.__handle_import_statement__("import os")
dict_import = \
self.config_handler.__handle_import_statement__(
"from typing import Dict"
)
from typing import Dict
self.assertEqual(os, os_import)
self.assertEqual(dict_import, Dict)
def get_unsent_reminders(database: sqlite3.Connection) \
-> List[Dict[str, str or int or datetime]]:
"""
Retrieves all unsent reminders from the database
:param database: The database to use
:return: A list of dictionaries that contain the reminder information
"""
results = database.execute(
"SELECT reminder.id, reminder.msg_text, reminder.due_time, "
"address_book.address, address_book.id, address_book.display_name "
"FROM reminder "
"JOIN address_book ON reminder.sender_id = address_book.id "
"WHERE reminder.sent = 0")
formatted_results = []
for result in results:
formatted_results.append({
"id": result[0],
"message": result[1],
"due_time": convert_string_to_datetime(result[2]),
"receiver": Contact(result[4], result[5], result[3])
})
return formatted_results
def thread_exists(thread: Dict[str, str or int], db: sqlite3.Connection) \
-> bool:
"""
Checks if a reddit discussion thread with the given thread parameters
already exists
:param thread: The thread to check for
:param db: The database to check
:return: True if the thread exists, false otherwise
"""
result = db.execute(
"SELECT * FROM anime_reminder_threads "
"WHERE show_name=? AND episode=? AND thread=?",
(thread["show_name"], thread["episode"], thread["url"])
)
return len(result.fetchall()) > 0
# noinspection SqlNoDataSourceInspection,SqlDialectInspection,SqlResolve
def define_language_text(self) -> Dict[str, Dict[str, str]]:
"""
Defines the dictionary with which the text is translated
in the translate() method.
This should be in the form of a dictionary like this:
{ key: {"language": "text_in_language", ...}, ... }
Keep in mind that every instance of the 'key'
value is replaced while translating
:return: The dictionary to create translations with
"""
raise NotImplementedError
# noinspection PyMethodMayBeStatic
def send_alert_notification(settings: Dict, tmpl_args: Dict):
attachment = {
'fallback': settings['tmpl-msg'].render(**tmpl_args),
'fields': [],
}
attachment['pretext'] = attachment['fallback']
if settings['tmpl-duration']:
attachment['fields'].append({
'title': 'Duration',
'value': settings['tmpl-duration'].render(**tmpl_args),
'short': False,
})
if settings['tmpl-url']:
attachment['fields'].append({
'title': 'URL',
'value': settings['tmpl-url'].render(**tmpl_args),
'short': False,
})
await send_slack_notification(settings['webhook-url'], [attachment])
def parse_settings(config: Any) -> Optional[Dict[str, Any]]:
ret = {
'webhook-url': config.get('slack-webhook-url'),
'tmpl-msg': config.get('slack-tmpl-msg'),
'tmpl-duration': config.get('slack-tmpl-duration', fallback=''),
'tmpl-url': config.get('slack-tmpl-url', fallback='')
} # type: Any
if not ret['webhook-url'] or not ret['tmpl-msg']:
log.debug('Slack settings missing, no slack notifications will be sent', 'NOTIFICATIONS')
ret = None
else:
log.debug('Valid slack notification settings found', 'NOTIFICATIONS')
ret['tmpl-msg'] = jinja2.Template(ret['tmpl-msg'])
if ret['tmpl-duration']:
ret['tmpl-duration'] = jinja2.Template(ret['tmpl-duration'])
if ret['tmpl-url']:
ret['tmpl-url'] = jinja2.Template(ret['tmpl-url'])
return ret
def parse_settings(config: Any) -> Optional[Dict[str, Any]]:
"""Parse clicksend sms settings.
Should only be called from sms.parse_settings.
"""
ret = {
'provider': 'clicksend',
'username': config.get('sms-clicksend-username'),
'api-key': config.get('sms-clicksend-api-key'),
'sender': config.get('sms-clicksend-sender'),
'tmpl': config.get('sms-tmpl'),
} # type: Any
if not ret['username'] or not ret['api-key'] or not ['sender'] or not ['tmpl']:
log.msg('SMS settings missing, no sms notifications will be sent', 'NOTIFICATIONS')
ret = None
else:
log.debug('Valid SMS notification settings found', 'NOTIFICATIONS')
ret['tmpl'] = jinja2.Template(ret['tmpl'])
return ret