def update_monitor_group(dbcon: DBConnection, monitor_group_id: int, data: Dict[str, Any]):
"""Update a monitor group in the database.
Data is a dict with parent_id/name values that will be updated.
"""
async def _run(cur: Cursor) -> None:
for key, value in data.items():
if key not in ['parent_id', 'name']:
raise errors.IrisettError('invalid monitor_group key %s' % key)
if key == 'parent_id' and value:
if monitor_group_id == int(value):
raise errors.InvalidArguments('monitor group can\'t be its own parent')
if not await monitor_group_exists(dbcon, value):
raise errors.InvalidArguments('parent monitor group does not exist')
q = """update monitor_groups set %s=%%s where id=%%s""" % key
q_args = (value, monitor_group_id)
await cur.execute(q, q_args)
await dbcon.transact(_run)
python类Dict()的实例源码
def require_dict(value: Optional[Dict[Any, Any]], key_type: Any=None, value_type: Any=None,
allow_none: bool=False) -> Any:
"""Make sure a value is a Dict[key_type, value_type].
Used when dealing with http input data.
"""
if value is None and allow_none:
return value
if type(value) != dict:
raise InvalidData('value was %s(%s), expected dict' % (type(value), value))
value = cast(Dict, value)
if key_type or value_type:
for k, v in value.items():
if key_type and type(k) != key_type:
raise InvalidData('dict key was %s(%s), expected %s' % (type(k), k, key_type))
if value_type and type(v) != value_type:
raise InvalidData('dict value was %s(%s), expected %s' % (type(v), v, key_type))
return value
def _get_monitor_metadata(self, dbcon: DBConnection) -> Optional[Dict[int, Dict[str, str]]]:
include_metadata = require_bool(
get_request_param(self.request, 'include_metadata', error_if_missing=False),
convert=True) or False
if not include_metadata:
return None
if 'id' in self.request.rel_url.query:
metadata_models = await metadata.get_metadata_for_object(
dbcon, 'active_monitor', require_int(cast(str, get_request_param(self.request, 'id'))))
elif 'meta_key' in self.request.rel_url.query:
meta_key = require_str(get_request_param(self.request, 'meta_key'))
meta_value = require_str(get_request_param(self.request, 'meta_value'))
metadata_models = await metadata.get_metadata_for_object_metadata(
dbcon, meta_key, meta_value, 'active_monitor', 'active_monitors')
elif 'monitor_group_id' in self.request.rel_url.query:
metadata_models = await monitor_group.get_active_monitor_metadata_for_monitor_group(
dbcon, require_int(cast(str, get_request_param(self.request, 'monitor_group_id'))))
else:
metadata_models = await metadata.get_metadata_for_object_type(dbcon, 'active_monitor')
metadata_dict = {} # type: Dict[int, Dict[str, str]]
for metadata_model in metadata_models:
if metadata_model.object_id not in metadata_dict:
metadata_dict[metadata_model.object_id] = {}
metadata_dict[metadata_model.object_id][metadata_model.key] = metadata_model.value
return metadata_dict
def __init__(self, config: Dict):
self.config = config
self.dbs = {}
def select(self, query: str, values: Union[List, Dict],
db_name: str = 'default') -> List[DictRow]:
return await self._select(query=query, values=values, db_name=db_name)
def first(self, query: str, values: Union[List, Dict],
db_name: str = 'default') -> Optional[DictRow]:
return await self._first(query=query, values=values, db_name=db_name)
def insert(self, query: str, values: Union[List, Dict],
db_name: str = 'default', returning: bool = False):
return await self._execute(query=query, values=values, db_name=db_name, returning=returning)
def update(self, query: str, values: Union[List, Dict],
db_name: str = 'default', returning: bool = False):
return await self._execute(query=query, values=values, db_name=db_name, returning=returning)
def _execute(self, query: str, values: Union[List, Dict], db_name: str = 'default',
returning: bool = False):
pool = self.dbs[db_name]['master']
if pool is None:
raise RuntimeError('db {} master is not initialized'.format(db_name))
async with pool.acquire() as conn:
async with conn.cursor(cursor_factory=DictCursor) as cursor:
await cursor.execute(query, values)
if returning:
return await cursor.fetchone()
else:
return cursor.rowcount
def _select(self, query: str, values: Union[List, Dict], db_name: str = 'default'):
dbs = self.dbs[db_name]
pool = dbs.get('slave') or dbs.get('master')
if pool is None:
raise RuntimeError('db {} master is not initialized'.format(db_name))
async with pool.acquire() as conn:
async with conn.cursor(cursor_factory=DictCursor) as cursor:
await cursor.execute(query, values)
return await cursor.fetchall()
def _first(self, query: str, values: Union[List, Dict], db_name: str = 'default'):
dbs = self.dbs[db_name]
pool = dbs.get('slave') or dbs.get('master')
if pool is None:
raise RuntimeError('db {} master is not initialized'.format(db_name))
async with pool.acquire() as conn:
async with conn.cursor(cursor_factory=DictCursor) as cursor:
await cursor.execute(query, values)
return await cursor.fetchone()
def _create_request_data(request: Request) -> Dict:
return {
'url': str(request.url),
'query_string': request.query_string,
'method': request.method,
'headers': dict(request.headers),
}
def assign_data(centroids: Sequence[Centroid], data: Iterable[Point]) -> Dict[Centroid, Sequence[Point]]:
'Assign data the closest centroid'
d : DefaultDict[Point, List[Point]] = defaultdict(list)
for point in data:
centroid: Point = min(centroids, key=partial(dist, point))
d[centroid].append(point)
return dict(d)
def quality(labeled: Dict[Centroid, Sequence[Point]]) -> float:
'Mean value of squared distances from data to its assigned centroid'
return mean(dist(c, p) ** 2 for c, pts in labeled.items() for p in pts)
def calc_rs_pos(self) -> Dict[str, float]:
"""Calculate the ratio of each pos of words in input text
Returns:
float: the ratio of each pos of words in input text
"""
pos = []
# TODO: It may take a long time when the number of sentences are large
for sentence in self.sentences:
juman_result = self.juman.analysis(sentence)
pos += [mrph.hinsi for mrph in juman_result.mrph_list()]
pos_counter = Counter(pos)
total = sum(pos_counter.values())
return {name: float(num) / total for name, num in pos_counter.items()}
def setExtraHTTPHeaders(self, extraHTTPHeaders: Dict[str, str]
) -> None:
"""Set extra http headers."""
self._extraHTTPHeaders = OrderedDict()
headers = OrderedDict() # type: Dict[str, str]
for k, v in extraHTTPHeaders.items():
self._extraHTTPHeaders[k] = v
headers[k] = v
await self._client.send('Network.setExtraHTTPHeaders',
{'headers': headers})
def extraHTTPHeaders(self) -> Dict[str, str]:
"""Get extra http headers."""
return dict(**self._extraHTTPHeaders)
def setExtraHTTPHeaders(self, headers: Dict[str, str]):
"""Set extra http headers."""
return await self._networkManager.setExtraHTTPHeaders(headers)
def goto(self, url: str, options: dict = None, **kwargs: Any
) -> Optional[Response]:
"""Got to url."""
if options is None:
options = dict()
options.update(kwargs)
watcher = NavigatorWatcher(self._client, self._ignoreHTTPSErrors,
options)
responses: Dict[str, Response] = dict()
listener = helper.addEventListener(
self._networkManager, NetworkManager.Events.Response,
lambda response: responses.__setitem__(response.url, response)
)
result = asyncio.ensure_future(watcher.waitForNavigation())
referrer = self._networkManager.extraHTTPHeaders().get('referer', '')
try:
await self._client.send('Page.navigate',
dict(url=url, referrer=referrer))
except Exception:
watcher.cancel()
raise
await result
helper.removeEventListeners([listener])
if self._frameManager.isMainFrameLoadingFailed():
raise PageError('Failed to navigate: ' + url)
return responses.get(self.url)
def addEventListener(emitter: EventEmitter, eventName: str, handler: Callable
) -> Dict[str, Any]:
"""Add handler to the emitter and return emitter/handler."""
emitter.on(eventName, handler)
return {'emitter': emitter, 'eventName': eventName, 'handler': handler}