def get(self,
labels: LabelsType) -> Dict[Union[float, str], NumericValueType]:
'''
Get gets a dict of values, containing the sum, count and percentiles,
matching an arbitrary group of labels.
:raises: KeyError if an item with matching labels is not present.
'''
return_data = {} # type: Dict[Union[float, str], NumericValueType]
e = self.get_value(labels)
e = cast(Any, e) # typing check, no runtime behaviour.
# Set invariants data (default to 0.50, 0.90 and 0.99)
for i in e._invariants: # type: ignore
q = i._quantile
return_data[q] = e.query(q) # type: ignore
# Set sum and count
return_data[self.SUM_KEY] = e._sum # type: ignore
return_data[self.COUNT_KEY] = e._observations # type: ignore
return return_data
python类cast()的实例源码
def get(self,
labels: LabelsType) -> Dict[Union[float, str], NumericValueType]:
'''
Get gets a dict of values, containing the sum, count and buckets,
matching an arbitrary group of labels.
:raises: KeyError if an item with matching labels is not present.
'''
return_data = {} # type: Dict[Union[float, str], NumericValueType]
h = self.get_value(labels)
h = cast(histogram.Histogram, h) # typing check, no runtime behaviour.
for upper_bound, cumulative_count in h.buckets.items():
return_data[upper_bound] = cumulative_count # keys are floats
# Set sum and count
return_data[self.SUM_KEY] = h.sum
return_data[self.COUNT_KEY] = h.observations
return return_data
def handle(self, config, out, force, wait, post_hook, **options):
server_id, auth_token = self._require_credentials(config)
server = Server.retrieve(server_id, auth_token)
while wait and server.csr_pending:
time.sleep(5)
server = Server.retrieve(server_id, auth_token)
tag = self._get_tag(config) if (not force) else None
result = server.get_pki(tag)
if result is not PKI.NOT_MODIFIED:
pki = cast(PKI, result)
if pki.entity is not None:
self._handle_pki(result, config, out, post_hook)
print("Certificates saved to {}.".format(out), file=self.stdout)
else:
print("No certificate available. Request one with req.", file=self.stdout)
else:
print("Not modified. Pass -f to download anyway.", file=self.stdout)
def __init__(self, source: Union['SourceTable', 'SourceQuery', 'File'], target: Union[str, Table], auto_map: bool = True, filter='', ignore_fields: List[str] = []) -> None:
#todo transformations
if isinstance(source, File):
self.file_name = source.file_name
elif isinstance(source, SourceTable):
self.source_table = source
elif isinstance(source, SourceQuery):
self.source_table = source
if isinstance(target, str):
self.sor_table = str(target) #type: str
else:
target_tbl = cast(Table, target)
self.sor_table = target_tbl.name
super().__init__(source, target, filter)
self.temp_table = self.sor_table.replace('_hstage', '') + '_temp'
#todo keys
self.keys = [] #type: List[str]
ignore_fields = [s.lower() for s in ignore_fields]
self.ignore_fields = ignore_fields
# self.field_mappings = [] #type: List[FieldMapping]
self.auto_map = auto_map
if auto_map: self.create_auto_mappings(source, ignore_fields)
def test_10_send_response_json(self) -> None:
self._restart_data('json')
processor = RequestProcessorJSON(self.recvbuffer)
processor._send_response(cast(Response, self.data))
res = self._loadResults('json')
self.assertEqual(len(res), 1)
self.assertDictEqual(self.data, res[0])
# process request already tested with TestPythonDriverBase
def updateEyesightCount(self) -> None:
self.eyesightCount = {c: [0] * 6 for c in self.colors}
p: Player
c: int
card: CardKnowledge
for p in self.game.players:
for c in p.hand:
card = cast(CardKnowledge, self.game.deck[c])
if card.suit is not None and card.rank is not None:
self.eyesightCount[card.suit][card.rank] += 1
elif card.color is not None and card.value is not None:
self.eyesightCount[card.color][card.value] += 1
def updateLocatedCount(self) -> bool:
'''Returns True if played/discarded cards has changed'''
newCount: Dict[Color, List[int]] = {c: [0] * 6 for c in self.colors}
p: Player
c: int
for p in self.game.players:
for c in p.hand:
card = cast(CardKnowledge, self.game.deck[c])
if card.color is not None and card.value is not None:
newCount[card.color][card.value] += 1
if newCount != self.locatedCount:
self.locatedCount = newCount
return True
return False
def handState(self,
player: int,
showCritical: bool=True) -> List[HandState]:
handState: List[HandState]
handState = [HandState.Unclued] * len(self.game.players[player].hand)
c: int
h: int
card: CardKnowledge
for c, h in enumerate(self.game.players[player].hand):
card = cast(CardKnowledge, self.game.deck[h])
if card.worthless is True:
handState[c] = HandState.Worthless
continue
if card.playWorthless is True:
handState[c] = HandState.Worthless
continue
if card.playable is True:
handState[c] = HandState.Playable
continue
if card.valuable is True:
handState[c] = HandState.Saved
continue
if card.clued:
handState[c] = HandState.SoonPlay
continue
if showCritical and player != self.position:
if self.isValuable(card.suit, card.rank):
handState[c] = HandState.Critical
elif card.rank == Value.V2 and self.is2Valuable(card.suit):
handState[c] = HandState.Critical2
return handState
def isCluedElsewhere(self, player: int, hand: int) -> bool:
returnVal: bool = False
cardIdx: int = self.game.players[player].hand[hand]
handcard: CardKnowledge
handcard = cast(CardKnowledge, self.game.deck[cardIdx])
color: Color = handcard.suit
value: Value = handcard.rank
p: Player
c: int
card: CardKnowledge
for p in self.game.players:
for c in p.hand:
card = cast(CardKnowledge, self.game.deck[c])
if card.deckPosition == handcard.deckPosition:
continue
if p is self:
if card.mustBeColor(color):
if card.mustBeValue(value):
return True
if card.cannotBeValue(value) and card.clued:
returnVal = None
elif card.mustBeValue(value):
if card.cannotBeColor(color) and card.clued:
returnVal = None
else:
if (card.clued
and card.suit == color
and card.rank == value):
return True
elif card.color == color and card.value == value:
return True
return returnVal
def cluedCard(self,
color: Color,
value: Value,
player: Optional[int]=None,
strict: bool=False,
maybe: bool=False) -> Optional[int]:
p: Player
c: int
card: CardKnowledge
for p in self.game.players:
if player == p.position:
if strict:
continue
# When it is the player, assume fully tagged cards as clued too
for c in p.hand:
card = cast(CardKnowledge, self.game.deck[c])
if card.color == color and card.value == value:
return card.deckPosition
if p is self:
if (maybe and card.maybeColor == color
and card.maybeValue == value):
return card.deckPosition
elif p is self:
for c in p.hand:
card = cast(CardKnowledge, self.game.deck[c])
if card.color == color and card.value == value:
return card.deckPosition
if (maybe and card.maybeColor == color
and card.maybeValue == value):
return card.deckPosition
else:
for c in p.hand:
card = cast(CardKnowledge, self.game.deck[c])
if (card.clued
and card.suit == color
and card.rank == value):
return card.deckPosition
return None
def doesCardMatchHand(self, deckIdx: int) -> bool:
deckCard: CardKnowledge
deckCard = cast(CardKnowledge, self.game.deck[deckIdx])
assert deckCard.suit is not None
assert deckCard.rank is not None
if self.colorComplete[deckCard.suit]:
return False
if deckCard.rank == Value.V5:
return False
h: int
for h in self.hand:
card: CardKnowledge
card = cast(CardKnowledge, self.game.deck[h])
if not card.clued:
continue
if card.worthless or card.playWorthless:
continue
if card.cantBe[deckCard.suit][deckCard.rank]:
continue
if card.color is not None and card.value is not None:
continue
if card.color == deckCard.suit:
maybeValue: Optional[Value] = card.maybeValue
if maybeValue is not None:
if maybeValue == deckCard.rank:
return True
continue
if deckCard.rank in card.possibleValues:
return True
if card.value == deckCard.rank:
maybeColor: Optional[Color] = card.maybeColor
if maybeColor is not None:
if maybeColor == deckCard.suit:
return True
continue
if deckCard.suit in card.possibleColors:
return True
return False
def pleaseObserveBeforeDiscard(self,
from_: int,
card_index: int,
deckIdx: int) -> None:
card: CardKnowledge = cast(CardKnowledge, self.game.deck[deckIdx])
card.state = CardState.Discard
self.seePublicCard(card.suit, card.rank)
def srand(seed=0):
# type: (KeyType) -> typing.Generator[int, None, None]
if isinstance(seed, six.string_types) or isinstance(seed, bytes):
if isinstance(seed, six.text_type):
seed = seed.encode('utf-8')
seed_int = int(hashlib.sha512(seed).hexdigest(), 16)
seed = typing.cast(int, seed_int)
rng = random.Random(seed)
while True:
yield rng.randint(0, sys.maxsize)
def configure_logging(logtype: str, logfilename: Optional[str]=None, debug_logging: bool=False,
rotate_length: int=1000000, max_rotated_files: int=250) -> None:
global logger
level = logging.INFO
if debug_logging:
level = logging.DEBUG
if logtype not in ['stdout', 'syslog', 'file']:
raise errors.IrisettError('invalid logtype name %s' % logtype)
if rotate_length is None:
rotate_length = 1000000
if max_rotated_files is None:
max_rotated_files = 250
logger = logging.getLogger('irisett')
logger.setLevel(level)
if logtype == 'stdout':
handler = logging.StreamHandler() # type: Any
handler.setLevel(level)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
elif logtype == 'syslog':
handler = logging.handlers.SysLogHandler(address='/dev/log')
handler.setLevel(level)
formatter = logging.Formatter('%(name)s - %(levelname)s - %(message)s')
else: # == file
logfilename = cast(str, logfilename)
logpath = os.path.split(logfilename)[0]
if not os.path.exists(logpath):
os.makedirs(logpath)
handler = logging.handlers.RotatingFileHandler(logfilename, maxBytes=rotate_length,
backupCount=max_rotated_files)
handler.setLevel(level)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
def require_list(value: Optional[List[Any]], item_type: Any=None, allow_none: bool=False) -> Any:
"""Make sure a value is a List[item_type].
Used when dealing with http input data.
"""
if value is None and allow_none:
return value
if type(value) != list:
raise InvalidData('value was %s, expected list' % type(value))
value = cast(List, value)
if item_type:
for item in value:
if type(item) != item_type:
raise InvalidData('list item was %s, expected %s' % (type(item), item_type))
return value
def require_int(value: Optional[Union[SupportsInt, str, bytes]], allow_none: bool=False) -> Any:
"""Make sure a value is an int.
Used when dealing with http input data.
"""
if value is None and allow_none:
return value
value = cast(Union[SupportsInt, str, bytes], value)
try:
value = int(value)
except (ValueError, TypeError):
raise InvalidData('value was %s(%s), expected list' % (type(value), value))
return value
def update_monitor(self) -> web.Response:
request_data = await self.request.json()
monitor = self._get_request_monitor(self.request)
if 'args' in request_data:
args = cast(Dict[str, str], require_dict(request_data['args']))
await monitor.update_args(args)
if 'checks_enabled' in request_data:
await monitor.set_checks_enabled_status(cast(bool, require_bool(request_data['checks_enabled'])))
if 'alerts_enabled' in request_data:
await monitor.set_alerts_enabled_status(cast(bool, require_bool(request_data['alerts_enabled'])))
return web.json_response(True)
def _get_request_monitor(self, request: web.Request) -> ActiveMonitor:
monitor_id = require_int(cast(str, get_request_param(request, 'id')))
monitor = request.app['active_monitor_manager'].monitors.get(monitor_id, None)
if not monitor:
raise errors.NotFound()
return monitor
def get(self) -> web.Response:
monitor_id = cast(int, require_int(get_request_param(self.request, 'monitor_id')))
if 'include_all' in self.request.rel_url.query:
contacts = await get_all_contacts_for_active_monitor(self.request.app['dbcon'], monitor_id)
else:
contacts = object_models.asdict(
await get_contacts_for_active_monitor(self.request.app['dbcon'], monitor_id)
)
ret = object_models.list_asdict(contacts)
return web.json_response(ret)
def post(self) -> web.Response:
request_data = await self.request.json()
await add_contact_to_active_monitor(
self.request.app['dbcon'],
cast(int, require_int(request_data.get('contact_id'))),
cast(int, require_int(request_data.get('monitor_id'))))
return web.json_response(True)