def __enter__(self):
try:
if self.sasl_username and self.sasl_password:
def request_cred(credentials, user_data):
for credential in credentials:
if credential[0] == libvirt.VIR_CRED_AUTHNAME:
credential[4] = self.sasl_username
elif credential[0] == libvirt.VIR_CRED_PASSPHRASE:
credential[4] = self.sasl_password
return 0
auth = [[libvirt.VIR_CRED_AUTHNAME,
libvirt.VIR_CRED_PASSPHRASE], request_cred, None]
flags = libvirt.VIR_CONNECT_RO if self.readonly else 0
self.conn = libvirt.openAuth(self.uri, auth, flags)
elif self.readonly:
self.conn = libvirt.openReadOnly(self.uri)
else:
self.conn = libvirt.open(self.uri)
return self.conn
except libvirt.libvirtError as e:
raise exception.LibvirtConnectionOpenError(uri=self.uri, error=e)
python类VIR_CRED_PASSPHRASE的实例源码
def __enter__(self):
try:
if self.sasl_username and self.sasl_password:
def request_cred(credentials, user_data):
for credential in credentials:
if credential[0] == libvirt.VIR_CRED_AUTHNAME:
credential[4] = self.sasl_username
elif credential[0] == libvirt.VIR_CRED_PASSPHRASE:
credential[4] = self.sasl_password
return 0
auth = [[libvirt.VIR_CRED_AUTHNAME,
libvirt.VIR_CRED_PASSPHRASE], request_cred, None]
flags = libvirt.VIR_CONNECT_RO if self.readonly else 0
self.conn = libvirt.openAuth(self.uri, auth, flags)
elif self.readonly:
self.conn = libvirt.openReadOnly(self.uri)
else:
self.conn = libvirt.open(self.uri)
return self.conn
except libvirt.libvirtError as e:
raise exception.LibvirtConnectionOpenError(uri=self.uri, error=e)
def retrieve_credentials(self, credentials, user_data):
"""
Retrieves the libvirt credentials in a strange format and hand it to
the API in order to communicate with the hypervisor.
To be honest, I have no idea why this has to be done this way. I have
taken this function from the official libvirt documentation.
:param credentials: libvirt credentials object
:param user_data: some data that will never be used
:type user_data: None
"""
#get credentials for libvirt
for credential in credentials:
if credential[0] == libvirt.VIR_CRED_AUTHNAME:
credential[4] = self.USERNAME
if len(credential[4]) == 0:
credential[4] = credential[3]
elif credential[0] == libvirt.VIR_CRED_PASSPHRASE:
credential[4] = self.PASSWORD
else:
return -1
return 0
def __libvirt_auth_credentials_callback(self, credentials, user_data):
for credential in credentials:
if credential[0] == libvirt.VIR_CRED_AUTHNAME:
credential[4] = self.login
if len(credential[4]) == 0:
credential[4] = credential[3]
elif credential[0] == libvirt.VIR_CRED_PASSPHRASE:
credential[4] = self.passwd
else:
return -1
return 0
def __connect_tcp(self):
flags = [libvirt.VIR_CRED_AUTHNAME, libvirt.VIR_CRED_PASSPHRASE]
auth = [flags, self.__libvirt_auth_credentials_callback, None]
uri = 'qemu+tcp://%s/system' % self.host
try:
return libvirt.openAuth(uri, auth, 0)
except libvirtError as e:
self.last_error = 'Connection Failed: ' + str(e)
self.connection = None
def __libvirt_auth_credentials_callback(self, credentials, user_data):
for credential in credentials:
if credential[0] == libvirt.VIR_CRED_AUTHNAME:
credential[4] = self.login
if len(credential[4]) == 0:
credential[4] = credential[3]
elif credential[0] == libvirt.VIR_CRED_PASSPHRASE:
credential[4] = self.passwd
else:
return -1
return 0
def __connect_tcp(self):
flags = [libvirt.VIR_CRED_AUTHNAME, libvirt.VIR_CRED_PASSPHRASE]
auth = [flags, self.__libvirt_auth_credentials_callback, None]
uri = 'qemu+tcp://%s/system' % self.host
try:
self.connection = libvirt.openAuth(uri, auth, 0)
self.last_error = None
except libvirtError as e:
self.last_error = 'Connection Failed: ' + str(e)
self.connection = None
def _get_auth_conn(config):
def request_cred(credentials, user_data):
for credential in credentials:
if credential[0] == libvirt.VIR_CRED_AUTHNAME:
credential[4] = config.get("username")
elif credential[0] == libvirt.VIR_CRED_PASSPHRASE:
credential[4] = config.get("password")
return 0
auth = [
[libvirt.VIR_CRED_AUTHNAME, libvirt.VIR_CRED_PASSPHRASE],
request_cred, None
]
return libvirt.openAuth(config["uri"], auth, 0)
def request_cred(self, credentials, user_data):
for credential in credentials:
if credential[0] == libvirt.VIR_CRED_AUTHNAME:
credential[4] = self.host_username
elif credential[0] == libvirt.VIR_CRED_PASSPHRASE:
credential[4] = self.host_password
return 0
def connect(self):
"""
Returns None in case of failed connection.
"""
conn = None
if self.conn_status is False:
return False
if self.host_protocol == 'libssh2':
self.auth = [[libvirt.VIR_CRED_AUTHNAME, libvirt.VIR_CRED_PASSPHRASE], self.request_cred, None]
if self.host_url == None:
conn = libvirt.open("qemu:///system")
else:
url = "{}://{}/?sshauth=password".format(VIRT_CONN_LIBSSH2, self.host_url)
conn = libvirt.openAuth(url, self.auth, 0)
elif self.host_protocol == 'ssh':
if self.host_url == None:
conn = libvirt.open("qemu:///system")
else:
url = "{}://{}@{}/system?socket=/var/run/libvirt/libvirt-sock&keyfile={}".format(VIRT_CONN_SSH, self.host_username,
self.host_url,
self.host_key)
conn = libvirt.open(url)
elif self.host_protocol == 'qemu':
conn = libvirt.open("qemu:///system")
if conn == None:
logging.error('Connection to hypervisor failed!')
return False
else:
logging.info('Connection succesfull.')
self.conn = conn
SETTINGS['connection'] = self.conn
def __connect(self):
"""This function establishes a connection to the hypervisor."""
#create weirdo auth dict
auth = [
[libvirt.VIR_CRED_AUTHNAME, libvirt.VIR_CRED_PASSPHRASE],
self.retrieve_credentials, None
]
#authenticate
self.SESSION = libvirt.openAuth(self.URI, auth, 0)
if self.SESSION == None:
raise SessionException("Unable to establish connection to hypervisor!")
def test_open_auth_ignored(self):
def req(credentials, user_data):
for cred in credentials:
if cred[0] == libvirt.VIR_CRED_AUTHNAME:
cred[4] = 'convirt'
elif cred[0] == libvirt.VIR_CRED_PASSPHRASE:
cred[4] = 'ignored'
return 0
auth = [[libvirt.VIR_CRED_AUTHNAME, libvirt.VIR_CRED_PASSPHRASE],
req, None]
conn = convirt.openAuth('convirt:///system', auth)
self.assertTrue(conn)
def auth_callback(credentials, user_data):
for credential in credentials:
if credential[0] == libvirt.VIR_CRED_AUTHNAME:
credential[4] = config.get('libvirt_username')
elif credential[0] == libvirt.VIR_CRED_PASSPHRASE:
credential[4] = config.get('libvirt_password')
return 0
def get_libvirt_connection(name, libvirt_url='qemu:///system'):
if name not in LIBVIRT_CONNECTIONS:
auth = [
[libvirt.VIR_CRED_AUTHNAME, libvirt.VIR_CRED_PASSPHRASE],
auth_callback, None
]
LIBVIRT_CONNECTIONS[name] = libvirt.openAuth(libvirt_url, auth)
return LIBVIRT_CONNECTIONS[name]
def __libvirt_auth_credentials_callback(self, credentials, user_data):
for credential in credentials:
if credential[0] == libvirt.VIR_CRED_AUTHNAME:
credential[4] = self.login
if len(credential[4]) == 0:
credential[4] = credential[3]
elif credential[0] == libvirt.VIR_CRED_PASSPHRASE:
credential[4] = self.passwd
else:
return -1
return 0
def __connect_tcp(self):
flags = [libvirt.VIR_CRED_AUTHNAME, libvirt.VIR_CRED_PASSPHRASE]
auth = [flags, self.__libvirt_auth_credentials_callback, None]
uri = 'qemu+tcp://%s/system' % self.host
try:
self.connection = libvirt.openAuth(uri, auth, 0)
self.last_error = None
except libvirtError as e:
self.last_error = 'Connection Failed: ' + str(e)
log_error(self.last_error)
self.connection = None