def getCallString(level):
#this gets us the frame of the caller and will work
#in python versions 1.5.2 and greater (there are better
#ways starting in 2.1
try:
raise FakeException("this is fake")
except Exception, e:
#get the current execution frame
f = sys.exc_info()[2].tb_frame
#go back as many call-frames as was specified
while level >= 0:
f = f.f_back
level = level-1
#if there is a self variable in the caller's local namespace then
#we'll make the assumption that the caller is a class method
obj = f.f_locals.get("self", None)
functionName = f.f_code.co_name
if obj:
callStr = obj.__class__.__name__+"::"+f.f_code.co_name+" (line "+str(f.f_lineno)+")"
else:
callStr = f.f_code.co_name+" (line "+str(f.f_lineno)+")"
return callStr
#send this message to all handlers of std messages
python类Exception()的实例源码
def apply_gate(self,gate,on_qubit_name):
on_qubit=self.qubits.get_quantum_register_containing(on_qubit_name)
if len(on_qubit.get_noop()) > 0:
print "NOTE this qubit has been measured previously, there should be no more gates allowed but we are reverting that measurement for consistency with IBM's language"
on_qubit.set_state(on_qubit.get_noop())
on_qubit.set_noop([])
if not on_qubit.is_entangled():
if on_qubit.get_num_qubits()!=1:
raise Exception("This qubit is not marked as entangled but it has an entangled state")
on_qubit.set_state(gate*on_qubit.get_state())
else:
if not on_qubit.get_num_qubits()>1:
raise Exception("This qubit is marked as entangled but it does not have an entangled state")
n_entangled=len(on_qubit.get_entangled())
apply_gate_to_qubit_idx=[qb.name for qb in on_qubit.get_entangled()].index(on_qubit_name)
if apply_gate_to_qubit_idx==0:
entangled_gate=gate
else:
entangled_gate=Gate.eye
for i in range(1,n_entangled):
if apply_gate_to_qubit_idx==i:
entangled_gate=np.kron(entangled_gate,gate)
else:
entangled_gate=np.kron(entangled_gate,Gate.eye)
on_qubit.set_state(entangled_gate*on_qubit.get_state())
QuantumComputer.py 文件源码
项目:QuantumComputingEvolutionaryAlgorithmDesign
作者: llens
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def apply_gate(self,gate,on_qubit_name):
on_qubit=self.qubits.get_quantum_register_containing(on_qubit_name)
if len(on_qubit.get_noop()) > 0:
print "NOTE this qubit has been measured previously, there should be no more gates allowed but we are reverting that measurement for consistency with IBM's language"
on_qubit.set_state(on_qubit.get_noop())
on_qubit.set_noop([])
if not on_qubit.is_entangled():
if on_qubit.get_num_qubits()!=1:
raise Exception("This qubit is not marked as entangled but it has an entangled state")
on_qubit.set_state(gate*on_qubit.get_state())
else:
if not on_qubit.get_num_qubits()>1:
raise Exception("This qubit is marked as entangled but it does not have an entangled state")
n_entangled=len(on_qubit.get_entangled())
apply_gate_to_qubit_idx=[qb.name for qb in on_qubit.get_entangled()].index(on_qubit_name)
if apply_gate_to_qubit_idx==0:
entangled_gate=gate
else:
entangled_gate=Gate.eye
for i in range(1,n_entangled):
if apply_gate_to_qubit_idx==i:
entangled_gate=np.kron(entangled_gate,gate)
else:
entangled_gate=np.kron(entangled_gate,Gate.eye)
on_qubit.set_state(entangled_gate*on_qubit.get_state())
def _executeSysCmd(cmd):
"""
Helper function to execute a system command
Input:
cmd: string, the command to be executed
Output:
plain string output of the result of the command as
received from STDOUT.
"""
try:
status,output = commands.getstatusoutput(cmd)
except Exception,e:
raise e
if status == 256:
return ''
return output
def tearDown(self):
''' self.drivers should override this method to perform required cleanup
if any is necessary, such as deleting the test database.
The default drops the tables that may be created.
'''
try:
con = self._connect()
try:
cur = con.cursor()
for ddl in (self.xddl1,self.xddl2):
try:
cur.execute(ddl)
con.commit()
except self.driver.Error:
# Assume table didn't exist. Other tests will check if
# execute is busted.
pass
finally:
con.close()
except _BaseException:
pass
def _get_raw_filehandle (self) :
self.close ()
try :
rawfile = None
rawfile = self.df.next ()
except IndexError :
self.df.fh = None
if rawfile :
if self.verbose :
sys.stderr.write ("\tOpening: %s\n" % rawfile)
try :
self.df.fh = os.open (rawfile, os.O_RDONLY)
except Exception, e :
self.ERRS.append ("%s" % e.message)
if self.verbose :
sys.stderr.write ("%s\n" % e.message)
self.df.fh = None
# Try to read the next buffers worth of data
def _get_raw_file_buf (self) :
self.buf.clear ()
buf = ''
# Get next member
while 1 :
m = self.df.next ()
if not m :
return
# Does it match our expected file name?
if fileRE.match (m.name) or sohRE.match (m.name) :
break
if self.verbose :
sys.stderr.write ("\tReading: %s" % m.name)
try :
buf = self.df.fh.extractfile (m).read ()
except Exception, e :
self.ERRS.append ("%s" % e.message)
if self.verbose :
sys.stderr.write ("%s\n" % e.message)
if buf :
self.buf.set (buf)
def _get_raw_names (self) :
# Try to open tar file
if self.verbose :
sys.stderr.write ("Opening: %s\n" % self.df.basefile)
try :
self.df.fh = tarfile.open (self.df.basefile)
except Exception, e :
self.ERRS.append ("%s" % e.message)
if self.verbose :
sys.stderr.write ("%s\n" % e.message)
# Get tar file 'members' and sort them
members = self.df.fh.getmembers ()
members.sort (self._member_cmp)
self.df.set (members)
def packet_time_epoch (self, p) :
#tdoy = TimeDoy.TimeDoy ()
#epoch = tdoy.epoch (p.year, p.doy, p.hr, p.mn, p.sc)
#try :
tdoy = timedoy.TimeDOY (year=p.year,
month=None,
day=None,
hour=p.hr,
minute=p.mn,
second=p.sc,
microsecond=0,
doy=p.doy,
epoch=None,
dtobject=None)
#except Exception, e :
#self.ERRS.append ("Error: Whacky time for packet: {0}".format (e.message))
#return None, None
return tdoy.epoch (), int (p.ms)
def isSEGD (self, expected_manufactures_code=0) :
''' Check to see if we are a Fairfield SEG-D file. '''
ret = False
try :
c = self.read_general_header_block_1 ()
self.FH.seek (0)
if c['manufactures_code'] == expected_manufactures_code :
ret = True
else :
if self.FH != None : self.FH.close ()
except Exception as e :
raise InputsError (e.message)
return ret
#
# Mix in's
#
def parse_trigger (self, trig, buf) :
try :
if trig == 'EVT' :
t = event_trigger ()
elif trig == 'TIM' :
t = time_trigger ()
elif trig == 'LEV' :
t = level_trigger ()
elif trig == 'CON' :
t = continuous_trigger ()
# XXX
elif trig == 'RAD' or trig == 'TML' :
t = time_list_trigger ()
elif trig == 'EXT' :
t = external_trigger ()
elif trig == 'CRS' :
t = cross_stream_trigger ()
else :
ret = None
ret = t.parse (buf)
except Exception as e :
raise CorruptPacketError ("DS Packet: " + e.message)
return ret
def fdsn2epoch (fdsn, fepoch=False) :
'''
Convert YYYY-MM-DDTHH:MM:SS.ssssss to epoch
'''
try :
ddate, ttime = fdsn.split ('T')
flds = ddate.split ('-'); yr, mo, da = map (int, flds)
flds = ttime.split (':'); hr, mn = map (int, flds[:-1])
sc = float (flds[2])
except Exception as e :
raise TimeError
tdoy = TimeDOY (year=yr,
month=mo,
day=da,
hour=hr,
minute=mn,
second=sc)
return tdoy.epoch (fepoch=fepoch)
def passcal2epoch (lopt, sep=':', fepoch=False) :
'''
Convert "YYYY:DOY:HH:MM:SS[.sss]" to epoch
'''
try :
flds = lopt.split (sep)
yr, jd, hr, mn = map (int, flds[:-1])
sc = float (flds[4])
except Exception as e :
raise TimeError
tdoy = TimeDOY (year=yr,
hour=hr,
minute=mn,
second=sc,
doy=jd)
return tdoy.epoch (fepoch=fepoch)
def __init__(self, error='', error_code=uge_status.UGE_ERROR, **kwargs):
"""
Class constructor.
:param error: Error message.
:type error: str
:param error_code: Error code.
:type error_code: int
:param kwargs: Keyword arguments, may contain 'args=error_message', 'exception=exception_object', or 'error_details=details'.
"""
args = error
if args == '':
args = kwargs.get('args', '')
ex = kwargs.get('exception', None)
if ex is not None and isinstance(ex, exceptions.Exception):
ex_args = "%s" % (ex)
if args == '':
args = ex_args
else:
args = "%s (%s)" % (args, ex_args)
exceptions.Exception.__init__(self, args)
self.error_code = error_code
self.error_details = kwargs.get('error_details', None)
def connectIDMChannel(self, idm_ior=None ):
self._log.debug("Connecting to IDM CHANNEL idm_ior:" + str(idm_ior) )
if self._idm_publisher == None:
if idm_ior != None and idm_ior != "":
# Get DomainManager incoming event channel and connect the device to it,
# where applicable.
try:
idm_channel_obj = resource.createOrb().string_to_object(idm_ior)
idm_channel = idm_channel_obj._narrow(CosEventChannelAdmin.EventChannel)
self._idm_publisher = Publisher( idm_channel )
self._log.info("Connected to IDM CHANNEL, (command line IOR).... DEV-ID:" + self._id )
except Exception, err:
#traceback.print_exc()
self._log.warn("Unable to connect to IDM channel (command line IOR).... DEV-ID:" + self._id )
else:
try:
# make sure we have access to get to the EventChanneManager for the domain
if self._domMgr:
if self._ecm == None:
self._log.debug("Setting up EventManager .... DEV-ID:" + self._id )
evt_mgr= Manager.GetManager(self)
self._ecm = evt_mgr
else:
evt_mgr = self._ecm
self._log.debug("Requesting registration with IDM Channel .... DEV-ID:" + self._id )
self._idm_publisher = evt_mgr.Publisher( ossie.events.IDM_Channel_Spec )
self._log.info("Registered with IDM CHANNEL (Domain::EventChannelManager).... DEV-ID:" + self._id )
except:
#traceback.print_exc()
self._log.warn("Unable to connect to IDM channel (Domain::EventChannelManager).... DEV-ID:" + self._id )
#########################################
# CF::LifeCycle
def releaseObject(self):
self._log.debug("releaseObject()")
if self._adminState == CF.Device.UNLOCKED:
self._adminState = CF.Device.SHUTTING_DOWN
try:
# release all of the child devices
# if they have included the AggregateDeviceMixIn
try:
childDevice = self._childDevices
except AttributeError:
pass
else:
while len(self._childDevices)>0:
child = self._childDevices.pop()
child.releaseObject()
# remove device from parent and set compositeDevice to None
if self._compositeDevice:
self._compositeDevice.removeDevice(self._this())
self._compositeDevice = None
self._unregister()
except Exception, e:
raise CF.LifeCycle.ReleaseError(str(e))
self._adminState = CF.Device.LOCKED
try:
self._cmdLock.release()
except:
pass
try:
resource.Resource.releaseObject(self)
except:
self._log.error("failed releaseObject()")
###########################################
# CF::Device
def _validateAllocProps(self, properties):
self._log.debug("validating")
# Validate before trying to consume
for prop in properties:
try:
if not self._props.isAllocatable(prop.id):
raise exceptions.Exception()
except:
self._log.error("Property %s is not allocatable", prop.id)
raise CF.Device.InvalidCapacity("Invalid capacity %s" % prop.id, [prop])
def allocateCapacity(self, properties):
"""
Takes the list of properties and turns it into a dictionary. If the
device has a allocateCapacities(propDict) method it is invoked. The
method should return a boolean flag indicating whether all the
allocations were done successfully or not
Input:
<properties> A list of CF.DataType properties to allocate
Output:
Returns true if all the allocations were done successfully or false
otherwise
"""
self._log.debug("allocateCapacity(%s)", properties)
# Validate
self._validateAllocProps(properties)
# Consume
propdict = {}
for prop in properties:
propdef = self._props.getPropDef(prop.id)
propdict[prop.id] = propdef._fromAny(prop.value)
try:
retval = self._allocateCapacities(propdict)
return retval
except CF.Device.InvalidCapacity:
raise # re-raise valid exceptions
except CF.Device.InvalidState:
raise # re-raise valid exceptions
except Exception, e:
self._log.exception("Unexpected error in _allocateCapacities: %s", str(e))
return False
def _unregister(self):
"""
Unregister with the DeviceManager. This has the potential to timeout as
omniORB will sometimes hang on the unregisterDevice call if there is a
lack of available threads on the system.
"""
def _logUnregisterFailure(msg = ""):
self._log.error("Could not unregister from DeviceManager: %s", msg)
def _unregisterThreadFunction():
if self._devmgr:
self._log.debug("Unregistering from DeviceManager")
try:
self._devmgr.unregisterDevice(self._this())
except CORBA.Exception, e:
_logUnregisterFailure(str(e))
# put something on the queue to indicate that we either
# successfully unregistered, or that we have already
# logged an error.
queue.put(True)
queue = Queue(maxsize=1)
success = resource.callOmniorbpyWithTimeout(_unregisterThreadFunction, queue)
if not success:
_logUnregisterFailure("timeout while attempting to unregister")
def _copyFile(self, fileSystem, remotePath, localPath):
self._log.debug("Copy file %s -> %s", remotePath, os.path.abspath(localPath))
modifiedName = None
fileToLoad = fileSystem.open(remotePath, True)
try:
f = open(localPath, "w+")
except Exception, e:
if "Text file busy" in e:
modifiedName = localPath+"_"+str(time.time()).split('.')[0]
os.rename(localPath, modifiedName)
f = open(localPath, "w+")
else:
fileToLoad.close();
raise
fileSize = fileToLoad.sizeOf()
floorFileTransferSize=1024*1024
while fileSize > 0:
toRead = min(floorFileTransferSize, fileSize)
buf = fileToLoad.read(toRead)
if len(buf) == 0:
break
f.write(buf)
fileSize = fileSize - len(buf)
fileToLoad.close()
f.close()
return modifiedName
def _unloadAll(self):
for fileName in self._loadedFiles.keys():
try:
self._log.debug("Forcing unload(%s)", fileName)
self._unload(fileName, force=True)
except Exception:
self._log.exception("Failed to unload file %s", fileName)
def state_from_string(qubit_state_string):
if not all(x in '01' for x in qubit_state_string):
raise Exception("Description must be a string in binary")
state=None
for qubit in qubit_state_string:
if qubit=='0':
new_contrib=State.zero_state
elif qubit=='1':
new_contrib=State.one_state
if state==None:
state=new_contrib
else:
state=np.kron(state,new_contrib)
return state
def num_qubits(state):
num_qubits=log(state.shape[0],2)
if state.shape[1]!=1 or num_qubits not in [1,2,3,4,5]:
raise Exception("unrecognized state shape")
else:
return int(num_qubits)
def get_quantum_register_containing(self,name):
for qb in self._qubits:
if qb.name == name:
return qb
else:
for entqb in qb.get_entangled():
if entqb.name==name:
return entqb
raise Exception("qubit %s not found" % name)
def probabilities_equal(self,name,prob):
get_states_for=[self.qubits.get_quantum_register_containing(x.strip()) for x in name.split(',')]
if not QuantumRegisterCollection.is_in_increasing_order(get_states_for):
raise Exception("at this time, requested qubits must be in increasing order")
entangled_qubit_order=self.qubits.get_entangled_qubit_order()
if (len(get_states_for)==1 and self.is_in_canonical_ordering()) or (get_states_for in entangled_qubit_order):
return np.allclose(Probability.get_probabilities(get_states_for[0].get_state()),prob)
else:
answer_state=self.get_requested_state_order(name)
return np.allclose(Probability.get_probabilities(answer_state),prob,atol=1e-2)
def qubit_states_equal(self,name,state):
get_states_for=[self.qubits.get_quantum_register_containing(x.strip()) for x in name.split(',')]
if not QuantumRegisterCollection.is_in_increasing_order(get_states_for):
raise Exception("at this time, requested qubits must be in increasing order")
entangled_qubit_order=self.qubits.get_entangled_qubit_order()
if (len(get_states_for)==1 and self.is_in_canonical_ordering()) or (get_states_for in entangled_qubit_order):
return np.allclose(get_states_for[0].get_state(),state)
else:
answer_state=self.get_requested_state_order(name)
return np.allclose(answer_state,state)
def apply_two_qubit_gate_CNOT(self,first_qubit_name,second_qubit_name):
""" Should work for all combination of qubits"""
first_qubit=self.qubits.get_quantum_register_containing(first_qubit_name)
second_qubit=self.qubits.get_quantum_register_containing(second_qubit_name)
if len(first_qubit.get_noop())>0 or len(second_qubit.get_noop())>0:
raise Exception("Control or target qubit has been measured previously, no more gates allowed")
if not first_qubit.is_entangled() and not second_qubit.is_entangled():
combined_state=np.kron(first_qubit.get_state(),second_qubit.get_state())
if first_qubit.get_num_qubits()!=1 or second_qubit.get_num_qubits()!=1:
raise Exception("Both qubits are marked as not entangled but one or the other has an entangled state")
new_state=Gate.CNOT2_01*combined_state
if State.is_fully_separable(new_state):
second_qubit.set_state(State.get_second_qubit(new_state))
else:
self.qubits.entangle_quantum_registers(first_qubit,second_qubit)
first_qubit.set_state(new_state)
else:
if not first_qubit.is_entangled_with(second_qubit):
# Entangle the state
combined_state=np.kron(first_qubit.get_state(),second_qubit.get_state())
self.qubits.entangle_quantum_registers(first_qubit,second_qubit)
else:
# We are ready to do the operation
combined_state=first_qubit.get_state()
# Time for more meta programming!
# Select gate based on indices
control_qubit_idx,target_qubit_idx=first_qubit.get_indices(second_qubit)
gate_size=QuantumRegister.num_qubits(combined_state)
try:
exec 'gate=Gate.CNOT%d_%d%d' %(gate_size,control_qubit_idx,target_qubit_idx)
except:
print 'gate=Gate.CNOT%d_%d%d' %(gate_size,control_qubit_idx,target_qubit_idx)
raise Exception("Unrecognized combination of number of qubits")
first_qubit.set_state(gate*combined_state)
def loadFromURL(self, url):
"""Load an xml file from a URL and return a DOM document."""
if isfile(url) is True:
f = open(url, 'r')
else:
f = urlopen(url)
try:
result = self.loadDocument(f)
except Exception, ex:
f.close()
raise ParseError(('Failed to load document %s' %url,) + ex.args)
else:
f.close()
return result
def loadFromURL(self, url):
"""Load an xml file from a URL and return a DOM document."""
if isfile(url) is True:
file = open(url, 'r')
else:
file = urlopen(url)
try:
result = self.loadDocument(file)
except Exception, ex:
file.close()
raise ParseError(('Failed to load document %s' %url,) + ex.args)
else:
file.close()
return result
def __init__(self, param):
self._source_fn = param.get('source')
self._label_fn = param.get('labels')
# bcf_mode: either FILE or MEM, default=FILE
self._bcf_mode = param.get('bcf_mode', 'FILE')
if not os.path.isfile(self._source_fn) or \
not os.path.isfile(self._label_fn):
raise Exception("Either Source of Label file does not exist")
else:
if self._bcf_mode == 'MEM':
self._bcf = bcf_store_memory(self._source_fn)
elif self._bcf_mode == 'FILE':
self._bcf = bcf_store_file(self._source_fn)
self._data = []
self._labels = []