def first(self, query: str, values: Union[List, Dict],
db_name: str = 'default') -> Optional[DictRow]:
return await self._first(query=query, values=values, db_name=db_name)
python类Optional()的实例源码
def post_message(user: User, text: str, timestamp: Optional[Timestamp]=None) -> None:
user = intern(user)
timestamp = timestamp or time()
post = Post(timestamp, user, text)
posts.appendleft(post)
user_posts[user].appendleft(post)
def posts_by_user(user: User, limit: Optional[int] = None) -> List[Post]:
return list(islice(user_posts[user], limit))
def posts_for_user(user: User, limit: Optional[int] = None) -> List[Post]:
relevant = merge(*[user_posts[u] for u in following[user]], reverse=True)
return list(islice(relevant, limit))
def search(phrase:str, limit: Optional[int] = None) -> List[Post]:
# XXX this could benefit from caching and from preindexing
return list(islice((post for post in posts if phrase in post.text), limit))
def hash_password(password: str, salt: Optional[bytes] = None) -> HashAndSalt:
pepper = b'alchemists discovered that gold came from earth air fire and water'
salt = salt or secrets.token_bytes(16)
salted_pass = salt + password.encode('utf-8')
return hashlib.pbkdf2_hmac('sha512', salted_pass, pepper, 100000), salt
def write_and_open_in_browser(html: str, filepath: Optional[str]):
''' Writes html to filepath and opens it. '''
file = NamedTemporaryFile('w', encoding='utf-8', delete=False) if filepath is None else open(filepath, 'w', encoding='utf-8')
file.write(html)
file.close()
webbrowser.open(file.name)
def render(posts: Iterable[Post], output_filepath: Optional[str] = None):
''' Puts html from posts into output_filepath (.html) and opens that in a specified browser '''
posts = list(posts)
if posts:
write_and_open_in_browser(render_to_html(posts), output_filepath)
else: print('Nothing found')
def split_by_predicate(seq: Sequence[T],
predicate,
zero_delim: Optional[T]=None) \
-> Iterator[Sequence[T]]:
"""
Splits a sequence by delimeters that satisfy predicate,
keeping the delimeters
split_by_predicate([0, "One", 1, 2, 3,
"Two", 4, 5, 6, 7, "Three", "Four"],
predicate=lambda x: isinstance(x, str),
zero_delim="Nothing")
[["Nothing", 0], ["One", 1, 2, 3], ["Two", 4, 5, 6, 7],
["Three"], ["Four"]]
:param seq: sequence to proceed
:param predicate: checks whether element is delimeter
:param zero_delim: pseudo-delimter prepended to the sequence
:return: sequence of sequences
"""
g = [zero_delim]
for el in seq:
if predicate(el):
yield g
g = []
g.append(el)
yield g
# END BASED
def spawn_or_create_counter(parent: Optional[Counter]):
if parent is not None:
return parent.spawn_child()
else:
return Counter()
def url_for_eq_snippet(self, label: str) -> Optional[str]:
"""
Returns url for equation snippet by label
Should be implemented in subclass
:param label:
:return:
"""
raise NotImplementedError
def __init__(self, *,
warnings: Optional[bool]=None,
highlight: Optional[bool]=None,
frame_context_length: int=50):
self._show_warnings = self._env_bool(warnings, 'PY_DEVTOOLS_WARNINGS', True)
self._highlight = self._env_bool(highlight, 'PY_DEVTOOLS_HIGHLIGHT', None)
# 50 lines should be enough to make sure we always get the entire function definition
self._frame_context_length = frame_context_length
def mainFrame(self) -> Optional['Frame']:
"""Retrun main frame."""
return self._mainFrame
def __init__(self, client: Session, mouse: Mouse, touchscreen: Touchscreen,
parentFrame: Optional['Frame'], frameId: str) -> None:
"""Make new frame."""
self._client = client
self._mouse = mouse
self._touchscreen = touchscreen
self._parentFrame = parentFrame
self._url = ''
self._detached = False
self._id = frameId
self._defaultContextId = '<not-initialized>'
self._waitTasks: Set[WaitTask] = set() # maybe list
self._childFrames: Set[Frame] = set() # maybe list
if self._parentFrame:
self._parentFrame._childFrames.add(self)
def querySelector(self, selector: str) -> Optional['ElementHandle']:
"""Get element which matches `selector` string.
If `selector` matches multiple elements, return first-matched element.
"""
remoteObject = await self._rawEvaluate(
'selector => document.querySelector(selector)', selector)
if remoteObject.get('subtype') == 'node':
return ElementHandle(self._client, remoteObject, self._mouse,
self._touchscreen)
await helper.releaseObject(self._client, remoteObject)
return None
def querySelectorEval(self, selector: str, pageFunction: str,
*args: Any) -> Optional[Any]:
"""Execute function on element which matches selector."""
elementHandle = await self.querySelector(selector)
if elementHandle is None:
raise PageError(
f'Error: failed to find element matching selector "{selector}"'
)
result = await elementHandle.evaluate(pageFunction, *args)
await elementHandle.dispose()
return result
def clearTimeout(fut: Optional[Union[asyncio.Future, asyncio.Handle]]) -> None:
"""Cancel timer task."""
if fut:
fut.cancel()
def mainFrame(self) -> Optional['Frame']:
"""Get main frame."""
return self._frameManager._mainFrame
def querySelector(self, selector: str) -> Optional['ElementHandle']:
"""Get Element which matches `selector`."""
frame = self.mainFrame
if not frame:
raise PageError('no main frame.')
return await frame.querySelector(selector)
def querySelectorEval(self, selector: str, pageFunction: str,
*args: Any) -> Optional[Any]:
"""Execute function on element which matches selector."""
frame = self.mainFrame
if not frame:
raise PageError('no main frame.')
return await frame.querySelectorEval(selector, pageFunction, *args)