cli.py 文件源码

python
阅读 18 收藏 0 点赞 0 评论 0

项目:clickhouse-cli 作者: hatarist 项目源码 文件源码
def connect(self):
        self.scheme = 'http'
        if '://' in self.host:
            u = urlparse(self.host, allow_fragments = False)
            self.host = u.hostname
            self.port = u.port or self.port
            self.scheme = u.scheme
        self.url = '{scheme}://{host}:{port}/'.format(scheme=self.scheme, host=self.host, port=self.port)
        self.client = Client(
            self.url,
            self.user,
            self.password,
            self.database,
            self.settings,
            self.stacktrace,
            self.conn_timeout,
            self.conn_timeout_retry,
            self.conn_timeout_retry_delay,
        )

        self.echo.print("Connecting to {host}:{port}".format(
            host=self.host, port=self.port)
        )

        try:
            response = self.client.query('SELECT version();', fmt='TabSeparated')
        except TimeoutError:
            self.echo.error("Error: Connection timeout.")
            return False
        except ConnectionError:
            self.echo.error("Error: Failed to connect.")
            return False
        except DBException as e:
            self.echo.error("Error:")
            self.echo.error(e.error)

            if self.stacktrace and e.stacktrace:
                self.echo.print("Stack trace:")
                self.echo.print(e.stacktrace)

            return False

        if not response.data.endswith('\n'):
            self.echo.error("Error: Request failed: `SELECT version();` query failed.")
            return False

        version = response.data.strip().split('.')
        self.server_version = (int(version[0]), int(version[1]), int(version[2]))

        self.echo.success(
            "Connected to ClickHouse server v{0}.{1}.{2}.\n".format(
                *self.server_version
            )
        )
        return True
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号