def test_rejection_sample(self):
rnd = np.random.RandomState(42)
data = self.data['binary']
joker = TheJoker(self.joker_params['binary'], random_state=rnd)
with pytest.raises(ValueError):
joker.rejection_sample(data)
joker.rejection_sample(data, n_prior_samples=128)
# check that jitter is always set to the fixed value
jitter = 5.*u.m/u.s
params = JokerParams(P_min=8*u.day, P_max=1024*u.day, jitter=jitter)
joker = TheJoker(params)
prior_samples = joker.sample_prior(128)
assert quantity_allclose(prior_samples['jitter'], jitter)
full_samples = joker.rejection_sample(data, n_prior_samples=128)
assert quantity_allclose(full_samples['jitter'], jitter)
python类m()的实例源码
testing_support.py 文件源码
项目:algorithm-reference-library
作者: SKA-ScienceDataProcessor
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def create_LOFAR_configuration(antfile: str, meta: dict = None) -> Configuration:
""" Define from the LOFAR configuration file
:param antfile:
:param meta:
:return: Configuration
"""
antxyz = numpy.genfromtxt(antfile, skip_header=2, usecols=[1, 2, 3], delimiter=",")
nants = antxyz.shape[0]
assert antxyz.shape[1] == 3, "Antenna array has wrong shape %s" % antxyz.shape
anames = numpy.genfromtxt(antfile, dtype='str', skip_header=2, usecols=[0], delimiter=",")
mounts = numpy.repeat('XY', nants)
location = EarthLocation(x=[3826923.9] * u.m, y=[460915.1] * u.m, z=[5064643.2] * u.m)
fc = Configuration(location=location, names=anames, mount=mounts, xyz=antxyz, frame='global',
diameter=35.0)
return fc
def density(self, density: u.kg/u.m**3):
"""Sets the simulation's density profile to the specified array.
Other arrays which depend on the density values, such as the kinetic
pressure, are then redefined automatically.
Parameters
----------
density : ndarray
Array of density values. Shape and size must be equal to those of
the simulation grid.
Must have units of density.
"""
assert density.shape == self.domain_shape, """
Specified density array shape {} does not match simulation grid {}.
""".format(density.shape, self.domain_shape)
self._density = density
# Momentum
def momentum(self, momentum: u.kg/(u.m**2 * u.s)):
"""Sets the simulation's momentum profile to the specified array.
Other arrays which depend on the velocity values, such as the kinetic
pressure,
are then redefined automatically.
Parameters
----------
momentum : ndarray
Array of momentum vectors. Shape must be (3, x, [y, z]), where x,
y and z are the dimensions of the simulation grid.
Note that a full 3D vector is necessary even if the simulation has
fewer than 3 dimensions.
"""
assert momentum.shape == (3, *self.domain_shape), """
Specified density array shape {} does not match simulation grid {}.
""".format(momentum.shape, (3, *self.domain_shape))
self._momentum = momentum
# Internal energy
def energy(self, energy: u.J/u.m**3):
"""Sets the simulation's total energy density profile to the specified array.
Other arrays which depend on the energy values, such as the kinetic
pressure, are then redefined automatically.
Parameters
----------
energy : ndarray
Array of energy values. Shape must be (x, y, z), where x, y, and z
are the grid sizes of the simulation in the x, y, and z dimensions.
Must have units of energy.
"""
assert energy.shape == self.domain_shape, """
Specified density array shape {} does not match simulation grid {}.
""".format(energy.shape, self.domain_shape)
self._energy = energy
# Magnetic field
def velocity(self, velocity: u.m / u.s):
"""Defines the velocity throughout the simulation, and automatically
updates the momentum based on the current density values.
Parameters
----------
velocity : ndarray
Array of velocity vectors with shape (3, x, [y, z]) where x, y and
z are the spatial grid sizes of the simulation.
Note that a full 3D vector is required even if the simulation is
run for fewer than 3 dimensions.
Must have units of velocity.
"""
assert velocity.shape == (3, *self.domain_shape), """Specified velocity
array shape does not match simulation grid."""
self.momentum = velocity * self.density
def rho_as_angle(rho, eph):
"""Projected linear distance to angular distance.
Parameters
----------
rho : `~astropy.units.Quantity`
Projected distance in units of length.
eph : dictionary-like or `~sbpy.data.Ephem`
Ephemerides; requires geocentric distance as `delta`.
Returns
-------
rho_l : `~astropy.units.Quantity`
"""
if rho.unit.is_equivalent(u.m):
rho_a = np.arctan(rho / eph['delta'].to(u.m))
else:
assert rho.unit.is_equivalent(u.rad)
rho_a = rho
return rho_a
def rho_as_length(rho, eph):
"""Angular distance to projected linear distance.
Parameters
----------
rho : `~astropy.units.Quantity`
Projected distance in units of angle.
eph : dictionary-like or `~sbpy.data.Ephem`
Ephemerides; requires geocentric distance as `delta`.
Returns
-------
rho_l : `~astropy.units.Quantity`
"""
if rho.unit.is_equivalent(u.rad):
rho_l = eph['delta'].to(u.m) * np.tan(rho)
else:
assert rho.unit.is_equivalent(u.m)
rho_l = rho
return rho_l
def setup(self):
mjd = np.linspace(55555., 56555., 256)
self.data = RVData(mjd, np.sin(mjd)*u.km/u.s,
stddev=0.1*np.random.uniform(size=mjd.size)*u.km/u.s)
n = 128
samples = dict()
samples['P'] = np.random.uniform(size=n) * u.day
samples['phi0'] = np.random.uniform(size=n) * u.radian
samples['omega'] = np.random.uniform(size=n) * u.radian
samples['ecc'] = np.random.uniform(size=n) * u.one
samples['jitter'] = np.random.uniform(size=n) * u.m/u.s
self.samples = samples
self.n = n
def test_shit():
joker_params = JokerParams(P_min=8*u.day, P_max=32768*u.day, jitter=0*u.m/u.s)
joker = TheJoker(joker_params)
t = np.random.uniform(0, 250, 16) + 56831.324
t.sort()
rv = np.cos(t)
rv_err = np.random.uniform(0.1, 0.2, t.size)
data = RVData(t=t, rv=rv*u.km/u.s, stddev=rv_err*u.km/u.s)
samples = joker.sample_prior(size=16384)
chunk = []
for k in samples:
chunk.append(np.array(samples[k]))
chunk = np.ascontiguousarray(np.vstack(chunk).T)
t0 = time.time()
cy_ll = batch_marginal_ln_likelihood(chunk, data, joker_params)
print("Cython:", time.time() - t0)
t0 = time.time()
n_chunk = len(chunk)
py_ll = np.zeros(n_chunk)
for i in range(n_chunk):
try:
py_ll[i] = marginal_ln_likelihood(chunk[i], data, joker_params)
except Exception as e:
py_ll[i] = np.nan
print("Python:", time.time() - t0)
assert np.allclose(np.array(cy_ll), py_ll)
def test_plotting():
# check that plotting at least succeeds with allowed arguments
t = np.random.uniform(55555., 56012., size=128)
rv = 100 * np.sin(0.5*t) * u.km/u.s
ivar = 1 / (np.random.normal(0,5,size=t.size)*u.km/u.s)**2
data = RVData(t=t, rv=rv, ivar=ivar)
data.plot()
# style
data.plot(color='r')
# custom axis
fig,ax = plt.subplots(1,1)
data.plot(ax=plt.gca())
# formatting
data.plot(rv_unit=u.m/u.s)
data.plot(rv_unit=u.m/u.s, time_format='jd')
data.plot(rv_unit=u.m/u.s, time_format=lambda x: x.utc.mjd)
# try with no errors
data = RVData(t=t, rv=rv)
data.plot()
data.plot(ecolor='r')
plt.close('all')
def GetAltAzHESS(coords, date='2016-06-06 00:00:00'):
'''
Get AltAz of a source at a given date for the HESS site
Parameters
----------
coords : CoordinatesHandler object
date : date for which the results is valid. Format is YYYY-MM-DD HH:MM:SS
'''
#Site Location
print "At HESS Location at ",date
location = EarthLocation(lat = '-23d16m18s' , lon = '16d30m00s', height = 1800*u.m)
time = Time(date, format='iso', scale='utc')
return _GetAltAz(coords,location,time)
def GetAltAzLAPALMA(coords, date='2016-06-06 00:00:00'):
'''
Get AltAz of a source at a given date for the HELAPALMASS site
Parameters
----------
coords : CoordinatesHandler object
date : date for which the results is valid. Format is YYYY-MM-DD HH:MM:SS
'''
#Site Location
print "At CTA North Location at ",date
location = EarthLocation(lat = '28d45m43s' , lon = '-17d53m24s', height = 1800*u.m)
time = Time(date, format='iso', scale='utc')
return _GetAltAz(coords,location,time)
testing_support.py 文件源码
项目:algorithm-reference-library
作者: SKA-ScienceDataProcessor
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def create_configuration_from_file(antfile: str, name: str = None, location: EarthLocation = None,
mount: str = 'altaz',
names: str = "%d", frame: str = 'local',
diameter=35.0,
meta: dict = None,
rmax=None, **kwargs) -> Configuration:
""" Define from a file
:param names:
:param antfile: Antenna file name
:param name: Name of array e.g. 'LOWBD2'
:param location:
:param mount: mount type: 'altaz', 'xy'
:param frame: 'local' | 'global'
:param diameter: Effective diameter of station or antenna
:param meta: Any meta info
:return: Configuration
"""
antxyz = numpy.genfromtxt(antfile, delimiter=",")
assert antxyz.shape[1] == 3, ("Antenna array has wrong shape %s" % antxyz.shape)
if frame == 'local':
latitude = location.geodetic[1].to(u.rad).value
antxyz = xyz_at_latitude(antxyz, latitude)
if rmax is not None:
lantxyz = antxyz - numpy.average(antxyz, axis=0)
r = numpy.sqrt(lantxyz[:, 0] ** 2 + lantxyz[:, 1] ** 2 + lantxyz[:, 2] ** 2)
antxyz = antxyz[r < rmax]
log.debug('create_configuration_from_file: Maximum radius %.1f m includes %d antennas/stations' %
(rmax, antxyz.shape[0]))
nants = antxyz.shape[0]
anames = [names % ant for ant in range(nants)]
mounts = numpy.repeat(mount, nants)
fc = Configuration(location=location, names=anames, mount=mounts, xyz=antxyz, frame=frame,
diameter=diameter)
return fc
def __init__(self, domain_x, domain_y, domain_z, gamma=5/3):
# Define domain sizes
self.x = domain_x
self.y = domain_y
self.z = domain_z
x, y, z = self.x.si.value, self.y.si.value, self.z.si.value
self.grid = np.meshgrid(x, y, z, indexing='ij') * u.m
self.grid = np.squeeze(self.grid)
self.domain_shape = []
for length in (len(x), len(y), len(z)):
if length > 1:
self.domain_shape.append(length)
self.domain_shape = tuple(self.domain_shape)
print(self.domain_shape)
self.gamma = gamma
# Initiate core plasma variables
self._density = np.zeros(self.domain_shape) * u.kg / u.m**3
self._momentum = np.zeros((3, *self.domain_shape)) * u.kg \
/ (u.m**2 * u.s)
self._energy = np.zeros(self.domain_shape) * u.J / u.m**3
self._magnetic_field = np.zeros((3, *self.domain_shape)) * u.T
# Collect core variables into a list for usefulness
self.core_variables
# Connect a simulation object for simulating
self.simulation_physics = MHDSimulation(self)
def _planck(self, lam, Teff):
"""
Computes monochromatic blackbody intensity in W/m^3 using the
Planck function.
@lam: wavelength in m
@Teff: effective temperature in K
Returns: monochromatic blackbody intensity
"""
return 2*self.h*self.c*self.c/lam**5 * 1./(np.exp(self.h*self.c/lam/self.k/Teff)-1)
def loadFile(input_file):
df = pd.read_csv(input_file).dropna()
x = df['stereo:estimated_direction:x']
y = df['stereo:estimated_direction:y']
z = df['stereo:estimated_direction:z']
_, lat, lon = cartesian_to_spherical(
x.values * u.m, y.values * u.m, z.values * u.m)
alt = Angle(90 * u.deg - lat).degree
az = Angle(lon).wrap_at(180 * u.deg).degree
df['alt'] = alt
df['az'] = az
return df
def altAz(self, df):
x = df['stereo:estimated_direction:x']
y = df['stereo:estimated_direction:y']
z = df['stereo:estimated_direction:z']
_, lat, lon = cartesian_to_spherical(
x * u.m, y * u.m, z * u.m)
alt = Angle(90 * u.deg - lat).degree
az = Angle(lon).wrap_at(180 * u.deg).degree
return alt, az
def dbquantity():
return DBQuantity(column("value"), u.m)
def test_dbquantity_add_same_units(dbquantity):
expr = (dbquantity + 100*u.m).value # value + :value_1
value_1 = expr.right.value
assert_almost_equal(value_1, 100)
def test_dbquantity_radd(dbquantity):
expr = (100*u.m + dbquantity).value # :value_1 + value
value_1 = expr.left.value
assert_almost_equal(value_1, 100)
def test_dbquantity_sub_same_untis(dbquantity):
expr = (dbquantity - 100*u.m).value # value - :value_1
value_1 = expr.right.value
assert_almost_equal(value_1, 100)
def test_dbquantity_rsub_same_units(dbquantity):
expr = (100*u.m - dbquantity).value # :value_1 - value
value_1 = expr.left.value
assert_almost_equal(value_1, 100)
def test_dbquantity_div_another_unit(dbquantity):
q = dbquantity/(2*u.m)
expr = q.value
value_1 = expr.right.value
assert_almost_equal(value_1, 2)
assert q.unit == u.Unit("")
def test_dbquantity_greater_same_units(dbquantity):
expr = dbquantity > 100*u.m # value > :value_1
value_1 = expr.right.value
assert_almost_equal(value_1, 100)
def test_exponential():
p = exponential(x="z",A="P_0",x_0="z_0",x_s="z_s")
P_0 = 100.0*apu.kPa
z_0 = 0.0*apu.m
z_s = -100.0*apu.m
z = apu.Quantity(np.linspace(0,1000.,100),"m")
p.set_param_values(P_0=P_0, z_0=z_0, z_s=z_s)
assert_allclose(p(z).value, (P_0*np.exp((z-z_0)/z_s)).value)
assert str(p(z).unit) == str(P_0.unit)
def convert_time(in_time):
"""Converts time to seconds since epoch
Args:
in_time (str): time obtained from header's keyword DATE-OBS
Returns:
time in seconds since epoch
"""
return time.mktime(time.strptime(in_time, "%Y-%m-%dT%H:%M:%S.%f"))
def print_default_args(args):
"""Print default values of arguments.
This is mostly helpful for debug but people not familiar with the software
might find it useful as well
Notes:
This function is deprecated.
Notes:
This is not dynamically updated so use with caution
Args:
args (object): An argparse instance
"""
arg_name = {'auto_clean': '--auto-clean',
'clean_cosmic': '-c, --cosmic',
'debug_mode': '--debug',
'flat_normalize': '--flat-normalize',
'ignore_bias': '--ignore-bias',
'log_to_file': '--log-to-file',
'norm_order': '--flat-norm-order',
'raw_path': '--raw-path',
'red_path': '--red-path',
'saturation_limit': '--saturation',
'destiny': '-d --proc-path',
'interactive_ws': '-i --interactive',
'lamp_all_night': '-r --reference-lamp',
'lamp_file': '-l --lamp-file',
'output_prefix': '-o --output-prefix',
'pattern': '-s --search-pattern',
'procmode': '-m --proc-mode',
'reference_dir': '-R --reference-files',
'source': '-p --data-path',
'save_plots': '--save-plots',
'dcr_par_dir': '--dcr-par-dir'}
for key in args.__dict__:
log_ccd.debug('Default value for {:s} is {:s}'.format(
arg_name[key],
str(args.__getattribute__(key))))
def test_from_flam(self):
fluxd = 6.730018324465526e-14 * u.W / u.m**2 / u.um
aper = 1 * u.arcsec
eph = dict(rh=1.5 * u.au, delta=1.0 * u.au)
S = 1869 * u.W / u.m**2 / u.um
afrho = Afrho.from_fluxd(None, fluxd, aper, eph, S=S)
assert np.isclose(afrho.cm, 1000)
def test_fluxd(self):
afrho = Afrho(1000, 'cm')
aper = 1 * u.arcsec
eph = dict(rh=1.5 * u.au, delta=1.0 * u.au)
S = 1869 * u.W / u.m**2 / u.um
fluxd = afrho.fluxd(None, aper, eph, S=S)
assert fluxd.unit.is_equivalent(S.unit)
assert np.isclose(fluxd.value, 6.730018324465526e-14)