def _send_receive(self, nummsgs: int, outformat: str='json',
dataupdate: Optional[Dict[AnyStr, Any]]=None,
restart_data: bool=True) -> List[Response]:
if restart_data:
self._restart_data(outformat)
if dataupdate:
self.data.update(dataupdate)
self._add_to_buffer(nummsgs, outformat)
self.sendbuffer.seek(0)
processor, _ = get_processor_instance(
outformat,
custom_outbuffer=self.recvbuffer,
custom_inbuffer=self.sendbuffer
)
processor.process_requests(self.sendbuffer)
return self._loadResults(outformat)
python类Optional()的实例源码
def format(self, content: Optional[QqTag],
blanks_to_pars=True,
keep_end_pars=True) -> str:
"""
:param content: could be QqTag or any iterable of QqTags
:param blanks_to_pars: use blanks_to_pars (True or False)
:param keep_end_pars: keep end paragraphs
:return: str: text of tag
"""
if content is None:
return ""
out = []
for child in content:
if isinstance(child, str):
if blanks_to_pars:
out.append(self.blanks_to_pars(html_escape(
child, keep_end_pars)))
else:
out.append(html_escape(child))
else:
out.append(self.handle(child))
return "".join(out)
def get_counter_for_tag(self, tag: QqTag) -> Optional[Counter]:
name = tag.name
counters = self.counters
while True:
if tag.exists('nonumber'):
return None
current = counters.get(name)
if current is None:
return None
if isinstance(current, Counter):
return current
if isinstance(current, dict):
counters = current
tag = tag.parent
name = tag.name
continue
return None
def __init__(self, client: Session, mouse: Mouse, touchscreen: Touchscreen
) -> None:
"""Make new frame manager."""
super().__init__()
self._client = client
self._mouse = mouse
self._touchscreen = touchscreen
self._frames: Dict[str, Frame] = dict()
self._mainFrame: Optional[Frame] = None
client.on('Page.frameAttached',
lambda event: self._onFrameAttached(
event.get('frameId', ''), event.get('parentFrameId', ''))
)
client.on('Page.frameNavigated',
lambda event: self._onFrameNavigated(event.get('frame')))
client.on('Page.frameDetached',
lambda event: self._onFrameDetached(event.get('frameId')))
client.on('Runtime.executionContextCreated',
lambda event: self._onExecutionContextCreated(
event.get('context')))
def __init__(self, client: Session, ignoreHTTPSErrors: Any,
options: dict = None, **kwargs: Any) -> None:
"""Make new navigator watcher."""
if options is None:
options = {}
options.update(kwargs)
self._client = client
self._ignoreHTTPSErrors = ignoreHTTPSErrors
self._timeout = options.get('timeout', 3000)
self._idleTime = options.get('networkIdleTimeout', 1000)
self._idleTimer: Optional[Union[asyncio.Future, asyncio.Handle]] = None
self._idleInflight = options.get('networkIdleInflight', 2)
self._waitUntil = options.get('waitUntil', 'load')
if self._waitUntil not in ('load', 'networkidle'):
raise ValueError(
f'Unknown value for options.waitUntil: {self._waitUntil}')
def waitForNavigation(self, options: dict = None, **kwargs: Any
) -> Optional[Response]:
"""Wait navigation completes."""
if options is None:
options = dict()
options.update(kwargs)
watcher = NavigatorWatcher(self._client, self._ignoreHTTPSErrors,
options)
responses: Dict[str, Response] = dict()
listener = helper.addEventListener(
self._networkManager,
NetworkManager.Events.Response,
lambda response: responses.__setitem__(response.url, response)
)
await watcher.waitForNavigation()
helper.removeEventListeners([listener])
return responses.get(self.url)
def __init__(self, exe, cache=None):
# type: (str, Optional[Cache]) -> None
if not os.path.isabs(exe):
exe = which(exe) # type: ignore
self.exe = unifilename(exe)
self.cache = cache
self._styledefinition = styledef_make()
self.allow_encoding_change = False
self.languages = [] # type: List[str]
self.initial_style = style_make()
# The are deleted after one call to minimize_errors
self.globaltempfiles = set() # type: Set[str]
# These are deleted after each round of attempts
self.tempfiles = set() # type: Set[str]
self.keeptempfiles = False
self.version_string = formatter_version(exe)
def cmdargs_for_style(self, formatstyle, filename=None):
# type: (Style, Optional[str]) -> List[str]
assert isinstance(formatstyle, Style)
configdata = bytestr(self.styletext(formatstyle))
sha = shahex(configdata)
cfg = os.path.join(tempfile.gettempdir(),
'whatstyle_rustfmt_%s/%s' % (sha, self.configfilename))
try:
dirpath = os.path.dirname(cfg)
os.makedirs(dirpath)
self.add_tempfile(dirpath)
except OSError as exc:
if exc.errno != errno.EEXIST:
raise
if not self.tempfile_exists(cfg):
writebinary(cfg, configdata)
self.add_tempfile(cfg)
cmdargs = ['--config-path', cfg]
return cmdargs
def __init__(self, database, tabledesc=None, timestamp=False):
# type: (str, Optional[Dict[str, str]], bool) -> None
self.database = database
if tabledesc is None:
tabledesc = self.prefixdesc()
self.tabledesc = tabledesc
timestampsql = self._sqtimestamp if timestamp else ''
sqcreate = self._sqcreate % timestampsql
self.kv_create = sqcreate.format(**tabledesc)
self.kv_get = self._sqget.format(**tabledesc)
self.kv_mget = self._sqmget.format(**tabledesc)
self.kv_put = self._sqput.format(**tabledesc)
self.kv_delete = self._sqdelete.format(**tabledesc)
self._connection = None # type: Optional[sqlite3.Connection]
self.sqlite_limit_variable_number = 999
self.support_mget = True
def mget(self, keys):
# type: (List[str]) -> List[Optional[bytes]]
rows = []
if self.support_mget:
try:
with self.conn as conn:
for somekeys in grouper(self.sqlite_limit_variable_number, keys):
keylist = list(somekeys)
questionmarks = ','.join(['?'] * len(keylist))
sql = self.kv_mget % questionmarks
for row in conn.execute(sql, keylist):
rows.append(row)
resultdict = dict(rows) # type: Dict[str, bytes]
rget = resultdict.get
return [rget(k) for k in keys]
except sqlite3.OperationalError:
self.support_mget = False
return [self.__get(k) for k in keys]
def mget(self, keys):
# type: (List[str]) -> List[Optional[bytes]]
if not keys:
return []
cached = []
uncached = [] # type: List[Tuple[int, Optional[bytes]]]
contentkeys = super(DedupKeyValueStore, self).mget(keys)
for idx, contentkey in enumerate(contentkeys):
if contentkey is None:
uncached.append((idx, None))
else:
sha = binary_type(contentkey)
cached.append((idx, unistr(sha)))
if not cached:
return [None for _, contentkey in uncached]
indices, existing_keys = zip(*cached)
existing_values = self.kvstore.mget(existing_keys)
idx_value_pairs = sorted(uncached + list(zip(indices, existing_values)))
return list([value for _, value in idx_value_pairs])
def mixtohash(self,
args=(), # type: Sequence[AnyStr]
exe=None, # type: Optional[str]
depfiles=(), # type: Sequence[str]
hashobj=None # type: Optional[Any]
):
# type: (...) -> Any
if hashobj is None:
hashobj = HASHFUNC()
for filename in depfiles:
hashobj.update(sysfilename(filename))
hashobj.update(filesha(filename))
hashobj.update(b'\x00')
for arg in args:
hashobj.update(sysfilename(arg))
hashobj.update(b'\x00')
if exe is not None:
hashobj.update(self.digest_for_exe(exe))
return hashobj
def apply(func, # type: Callable[..., bytes]
args=(), # type: Sequence[AnyStr]
exe=None, # type: Optional[str]
depfiles=(), # type: Sequence[str]
cache=None # type: Optional[Cache]
):
"""Applies func(*args) when the result is not present in the cache.
The result of func(*args) must be bytes and must not be None which is used as
cache-miss indicator. After evaluation of func the result is stored in the cache.
"""
key, value = None, None
if cache is not None:
hashobj = cache.mixtohash(args, exe=exe, depfiles=depfiles)
key = hashobj.hexdigest()
value = cache.get(key)
if value is None:
value = func(*args)
if key is not None:
cache.set(key, value)
return value
def unified_diff(filename, content2=None):
# type: (str, Optional[bytes]) -> Tuple[int, Iterable[str]]
"""This function prints a unified diff of the contents of
filename and the standard input, when used from the command line
as follows:
echo 123 > d.txt ; echo 456 | ./whatstyle.py --stdindiff d.txt
We get this result:
---
+++
@@ -1 +1 @@
-123
+456
"""
use_stdin = content2 is None
if content2 is None:
# Read binary input stream
stdin = rawstream(sys.stdin)
econtent2 = bytestr(stdin.read())
else:
econtent2 = content2
exit_code, diff = compute_unified_diff(filename, econtent2, lineterm='')
if use_stdin:
write('\n'.join(diff))
return exit_code, diff
def __init__(self,
username: str,
password: str,
botModule: str,
botconfig: Mapping,
numPlayers: int,
variant: Variant,
spectators: bool,
gameName: str,
*args,
**kwargs) -> None:
super().__init__(*args, **kwargs)
self.username: str = username
self.password: str = password
module = importlib.import_module(botModule + '.bot')
self.botCls: Type[Bot] = module.Bot # type: ignore
self.botconfig: Mapping = botconfig
self.numPlayers: int = numPlayers
self.variant: Variant = variant
self.spectators: bool = spectators
self.gameName: str = gameName
self.conn: socketIO_client.SocketIO
self.tablePlayers: List[str] = []
self.readyToStart: bool = False
self.game: Optional[Game] = None
def isNowPlayable(self,
color: Optional[Color],
value: Optional[Value]) -> bool:
'''Returns True the color and/or value is playable'''
assert color is not None or value is not None
if color is not None and value is not None:
return self.isPlayable(color, value)
if color is not None:
playableValue = len(self.game.playedCards[color]) + 1
if (playableValue <= 5
and not self.locatedCount[color][playableValue]):
return True
return False
if value is not None:
for c in self.colors:
if (len(self.game.playedCards[c]) + 1 == value
and not self.locatedCount[c][value]):
return True
return False
assert False
def maybeGiveEarlyGameHint(self, highValue: bool) -> bool:
bestHint: Hint = Hint()
i: int
for i in range(1, self.game.numPlayers):
player: int = (self.position + i) % self.game.numPlayers
candidate: Optional[Hint]
candidate = self.bestEarlyHintForPlayer(player, highValue)
if candidate is not None and candidate.fitness >= 0:
handState: List[HandState] = self.handState(player)
if HandState.Playable not in handState:
candidate.fitness += (self.game.numPlayers - i) * 2
if candidate is not None and candidate.fitness > bestHint.fitness:
bestHint = candidate
if bestHint.fitness <= 0:
return False
bestHint.give(self)
return True
def deck_draw(self,
player: int,
deckidx: int,
color: Optional[Color],
value: Optional[Value]) -> None:
if player == self.botPosition:
card = self.bot.create_own_card(deckidx)
self.deck[deckidx] = card
self.bot.drew_card(deckidx)
else:
color = Suit(color).color(self.variant)
value = Rank(value).value_()
card = self.bot.create_player_card(player, deckidx, color, value)
self.deck[deckidx] = card
self.players[player].drew_card(deckidx)
self.bot.someone_drew(player, deckidx)
def register_command(handler: Callable[[argparse.Namespace], None],
main_parser: Optional[ArgParserType]=None,
) -> Callable[[argparse.Namespace], None]:
if main_parser is None:
main_parser = global_argparser
if id(main_parser) not in _subparsers:
subparsers = main_parser.add_subparsers(title='commands',
dest='command')
_subparsers[id(main_parser)] = subparsers
else:
subparsers = _subparsers[id(main_parser)]
@functools.wraps(handler)
def wrapped(args):
handler(args)
doc_summary = handler.__doc__.split('\n\n')[0]
inner_parser = subparsers.add_parser(handler.__name__.replace('_', '-'),
description=handler.__doc__,
help=doc_summary)
inner_parser.set_defaults(function=wrapped)
wrapped.register_command = functools.partial(register_command,
main_parser=inner_parser)
wrapped.add_argument = inner_parser.add_argument
return wrapped
def _list(cls, user_id: int,
is_active: Optional[bool]=None,
fields: Optional[Iterable[str]]=None):
if fields is None:
fields = (
'access_key', 'secret_key',
'is_active', 'is_admin',
)
q = 'query($user_id: Int!, $is_active: Boolean) {' \
' keypairs(user_id: $user_id, is_active: $is_active) {' \
' $fields' \
' }' \
'}'
q = q.replace('$fields', ' '.join(fields))
vars = {
'user_id': user_id,
'is_active': is_active,
}
resp = yield Request('POST', '/admin/graphql', {
'query': q,
'variables': vars,
})
data = resp.json()
return data['keypairs']
def _get_or_create(cls, lang: str,
client_token: Optional[str]=None,
mounts: Optional[Iterable[str]]=None,
envs: Optional[Mapping[str, str]]=None,
max_mem: int=0, exec_timeout: int=0) -> str:
if client_token:
assert len(client_token) > 8
else:
client_token = uuid.uuid4().hex
resp = yield Request('POST', '/kernel/create', {
'lang': lang,
'clientSessionToken': client_token,
'config': {
'mounts': mounts,
'envs': envs,
},
# 'limits': {
# 'maxMem': max_mem,
# 'execTimeout': exec_timeout,
# },
})
data = resp.json()
o = cls(data['kernelId']) # type: ignore
o.created = data.get('created', True) # True is for legacy
return o
def __init__(self, endpoint: Optional[str]=None,
version: str='v2.20170315',
user_agent: Optional[str]=None,
access_key: Optional[str]=None,
secret_key: Optional[str]=None,
hash_type: Optional[str]=None) -> None:
self._endpoint = \
endpoint if endpoint else get_env('ENDPOINT', self.DEFAULTS['endpoint'])
self._version = \
version if version else self.DEFAULTS['version']
self._user_agent = \
user_agent if user_agent else self.DEFAULTS['user_agent']
self._access_key = \
access_key if access_key else get_env('ACCESS_KEY')
self._secret_key = \
secret_key if secret_key else get_env('SECRET_KEY')
self._hash_type = \
hash_type.lower() if hash_type else 'sha256'
def indices_to_load_by_target_index(allowed_characters_for_loaded_model: List[chr],
allowed_characters: List[chr]) -> List[Optional[int]]:
load_character_set = set(allowed_characters_for_loaded_model)
target_character_set = set(allowed_characters)
ignored = load_character_set - target_character_set
if ignored:
log("Ignoring characters {} from loaded model.".format(sorted(ignored)))
extra = target_character_set - load_character_set
if extra:
log("Initializing extra characters {} not found in model.".format(sorted(extra)))
def character_index_to_load(target_character: chr) -> Optional[int]:
return single_or_none([index for index, character in enumerate(allowed_characters_for_loaded_model) if
character == target_character])
character_mapping = [character_index_to_load(character) for character in allowed_characters]
log("Character mapping: {}".format(character_mapping))
return character_mapping
def __init__(self,
get_raw_audio: Callable[[], ndarray],
sample_rate: int = 16000,
id: Optional[str] = None,
label: Optional[str] = "nolabel",
fourier_window_length: int = 512,
hop_length: int = 128,
mel_frequency_count: int = 128,
label_with_tags: str = None,
positional_label: Optional[PositionalLabel] = None):
super().__init__(id=id, label=label)
# The default values for hop_length and fourier_window_length are powers of 2 near the values specified in the wave2letter paper.
self.get_raw_audio = get_raw_audio
self.sample_rate = sample_rate
self.fourier_window_length = fourier_window_length
self.hop_length = hop_length
self.mel_frequency_count = mel_frequency_count
self.label_with_tags = label_with_tags
self.positional_label = positional_label
def load(corpus_csv_file: Path,
sampled_training_example_count: Optional[int] = None) -> 'Corpus':
import csv
with corpus_csv_file.open(encoding='utf8') as opened_csv:
reader = csv.reader(opened_csv, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
def to_absolute(audio_file_path: Path) -> Path:
return audio_file_path if audio_file_path.is_absolute() else Path(
corpus_csv_file.parent) / audio_file_path
examples = [
(
LabeledExampleFromFile(
audio_file=to_absolute(Path(audio_file_path)), id=id, label=label,
positional_label=None if positional_label == "" else PositionalLabel.deserialize(
positional_label)), Phase[phase])
for id, audio_file_path, label, phase, positional_label in reader]
return Corpus(training_examples=[e for e, phase in examples if phase == Phase.training],
test_examples=[e for e, phase in examples if phase == Phase.test],
sampled_training_example_count=sampled_training_example_count)
def get_max_q_values(
self,
next_states: np.ndarray,
possible_next_actions: Optional[np.ndarray] = None,
use_target_network: Optional[bool] = True
) -> np.ndarray:
q_values = self.get_q_values_all_actions(
next_states, use_target_network
)
if possible_next_actions is not None:
mask = np.multiply(
np.logical_not(possible_next_actions),
self.ACTION_NOT_POSSIBLE_VAL
)
q_values += mask
return np.max(q_values, axis=1, keepdims=True)
def get_q_values_all_actions(
self, states: np.ndarray, use_target_network: Optional[bool] = True
) -> np.ndarray:
"""
Takes in a set of states and runs the test Q Network on them.
Creates Q(states, actions), a blob with shape (batch_size, action_dim).
Q(states, actions)[i][j] is an approximation of Q*(states[i], action_j).
Note that action_j takes on every possible action (of which there are
self.action_dim_. Stores blob in self.output_blob and returns its value.
:param states: Numpy array with shape (batch_size, state_dim). Each row
contains a representation of a state.
:param possible_next_actions: Numpy array with shape (batch_size, action_dim).
possible_next_actions[i][j] = 1 iff the agent can take action j from
state i.
:use_target_network: Boolean that indicates whether or not to use this
trainer's TargetNetwork to compute Q values.
"""
if use_target_network:
return self.target_network.target_values(states)
return self.score(states)
def preprocess_samples(
self,
states: List[Dict[str, float]],
actions: List[str],
rewards: List[float],
next_states: List[Dict[str, float]],
next_actions: List[str],
is_terminals: List[bool],
possible_next_actions: List[List[str]],
reward_timelines: Optional[List[Dict[int, float]]],
) -> TrainingDataPage:
tdp = self.preprocess_samples_discrete(
states, actions, rewards, next_states, next_actions, is_terminals,
possible_next_actions, reward_timelines
)
tdp.states = np.where(tdp.states == 1.0)[1].reshape(-1, 1
).astype(np.float32)
tdp.next_states = np.where(tdp.next_states == 1.0)[1].reshape(
-1, 1
).astype(np.float32)
return tdp
def add_assignment(
self, *,
service_code: netsgiro.ServiceCode,
assignment_type: netsgiro.AssignmentType,
agreement_id: Optional[str]=None,
number: str,
account: str,
date: Optional[datetime.date]=None
) -> 'Assignment':
"""Add an assignment to the tranmission."""
assignment = Assignment(
service_code=service_code,
type=assignment_type,
agreement_id=agreement_id,
number=number,
account=account,
date=date,
)
self.assignments.append(assignment)
return assignment
def create(cls, texts: List[Any], name: str, *, locations: Opt[List[float]] = None,
durations: Opt[List[float]] = None, default: bool = False) -> 'SubtitleTrack':
subtitles = []
if locations:
start_times, end_times = loc_util.start_end_locations_from_locations(locations)
elif durations:
start_times, end_times = loc_util.start_end_locations_from_intervals(durations)
else:
raise ex.ParameterError(f"Must provide either locations or durations for the subtitle track {name}")
for index, (text, start_time, end_time) in enumerate(zip(texts, start_times, end_times)):
subtitle = Subtitle(str(text), start_time, end_time)
subtitles.append(subtitle)
subtitle_track = cls(subtitles, name, default=default)
return subtitle_track