def get_conn(self):
def _str(s):
# cloudant-python doesn't support unicode.
if isinstance(s, unicode):
logging.debug(('cloudant-python does not support unicode. '
'Encoding %s as ascii using "ignore".'),
s)
return s.encode('ascii', 'ignore')
return s
conn = self.get_connection(self.cloudant_conn_id)
for conn_param in ['host', 'password', 'schema']:
if not hasattr(conn, conn_param) or not getattr(conn, conn_param):
raise AirflowException(
'missing connection parameter {0}'.format(conn_param)
)
# In the connection form:
# - 'host' is renamed to 'Account'
# - 'login' is renamed 'Username (or API Key)'
# - 'schema' is renamed to 'Database'
#
# So, use the 'host' attribute as the account name, and, if login is
# defined, use that as the username.
account = cloudant.Account(_str(conn.host))
username = _str(conn.login or conn.host)
account.login(
username,
_str(conn.password)).raise_for_status()
return account.database(_str(conn.schema))
评论列表
文章目录