def connect(self) -> None:
if self.host.startswith('/'):
self.reader, self.writer = await asyncio.open_unix_connection(
path=self.host, loop=self.loop
)
else:
self.reader, self.writer = await asyncio.open_connection(
host=self.host, port=self.port, loop=self.loop
)
sock = self.writer.transport.get_extra_info('socket')
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, int(self.options.pool_options.socket_keepalive))
if self.host.startswith('/'):
endpoint = self.host
else:
endpoint = '{}:{}'.format(self.host, self.port)
logger.debug('Established connection to {}'.format(endpoint))
self.read_loop_task = asyncio.ensure_future(self.read_loop(), loop=self.loop)
ismaster = IsMaster(await self.command(
'admin', SON([('ismaster', 1)]), ReadPreference.PRIMARY, DEFAULT_CODEC_OPTIONS
))
self.is_mongos = ismaster.server_type == SERVER_TYPE.Mongos
self.max_wire_version = ismaster.max_wire_version
if ismaster.max_bson_size:
self.max_bson_size = ismaster.max_bson_size
if ismaster.max_message_size:
self.max_message_size = ismaster.max_message_size
if ismaster.max_write_batch_size:
self.max_write_batch_size = ismaster.max_write_batch_size
self.is_writable = ismaster.is_writable
self.slave_ok = not self.is_mongos and self.options.read_preference != ReadPreference.PRIMARY
if self.options.credentials:
await self._authenticate()
# Notify waiters that connection has been established
self.__connected.set()
评论列表
文章目录