cloudant_hook.py 文件源码

python
阅读 14 收藏 0 点赞 0 评论 0

项目:airflow 作者: apache-airflow 项目源码 文件源码
def get_conn(self):
        def _str(s):
            # cloudant-python doesn't support unicode.
            if isinstance(s, unicode):
                logging.debug(('cloudant-python does not support unicode. '
                               'Encoding %s as ascii using "ignore".'),
                              s)
                return s.encode('ascii', 'ignore')

            return s

        conn = self.get_connection(self.cloudant_conn_id)

        for conn_param in ['host', 'password', 'schema']:
            if not hasattr(conn, conn_param) or not getattr(conn, conn_param):
                raise AirflowException(
                    'missing connection parameter {0}'.format(conn_param)
                )

        # In the connection form:
        # - 'host' is renamed to 'Account'
        # - 'login' is renamed 'Username (or API Key)'
        # - 'schema' is renamed to 'Database'
        #
        # So, use the 'host' attribute as the account name, and, if login is
        # defined, use that as the username.
        account = cloudant.Account(_str(conn.host))

        username = _str(conn.login or conn.host)

        account.login(
            username,
            _str(conn.password)).raise_for_status()

        return account.database(_str(conn.schema))
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号