def __init__(self, env):
self.env = env
if isinstance(env.observation_space, Discrete):
self.state_size = 1
else:
self.state_size = numel(env.observation_space.shape)
if isinstance(self.env.action_space, Discrete):
self.is_discrete = True
self.action_size = env.action_space.n
self.actions = np.arange(self.action_size)
else:
self.is_discrete = False
self.action_size = numel(env.action_space.sample())
python类Discrete()的实例源码
def __init__(self, env, keys):
super(DiscreteToFixedKeysVNCActions, self).__init__(env)
self._keys = keys
self._generate_actions()
self.action_space = spaces.Discrete(len(self._actions))
def __init__(self, env, keys):
super(DiscreteToFixedKeysVNCActions, self).__init__(env)
self._keys = keys
self._generate_actions()
self.action_space = spaces.Discrete(len(self._actions))
def __init__(self, alpha=0.02, show_number=False):
self.action_space = spaces.Discrete(NUM_LOC)
self.observation_space = spaces.Discrete(NUM_LOC)
self.alpha = alpha
self.set_start_mark('O')
self.show_number = show_number
self._seed()
self._reset()
def __init__(self):
# Angle at which to fail the episode
self.theta_threshold_radians = 12 * 2 * math.pi / 360
self.x_threshold = 2.4
# Initializing Course : predfined Oval Course
# ToDo: ????????????
Rad = 190.0
Poly = 16
self.Course = Walls(240, 50, 640-(50+Rad),50)
for i in range(1, Poly):
self.Course.addPoint(Rad*math.cos(-np.pi/2.0 + np.pi*i/Poly)+640-(50+Rad),
Rad*math.sin(-np.pi/2.0 + np.pi*i/Poly)+50+Rad)
self.Course.addPoint(240, 50+Rad*2)
for i in range(1, Poly):
self.Course.addPoint(Rad*math.cos(np.pi/2.0 + np.pi*i/Poly)+(50+Rad),
Rad*math.sin(np.pi/2.0 + np.pi*i/Poly)+50+Rad)
self.Course.addPoint(240,50)
# Outr Boundary Box
self.BBox = Walls(640, 479, 0, 479)
self.BBox.addPoint(0,0)
self.BBox.addPoint(640,0)
self.BBox.addPoint(640,479)
# Mono Sensor Line Follower
self.A = Agent((640, 480), 240, 49)
# Action Space : left wheel speed, right wheel speed
# Observation Space : Detect Line (True, False)
self.action_space = spaces.Box( np.array([-1.,-1.]), np.array([+1.,+1.]))
self.observation_space = spaces.Discrete(1)
self._seed()
self.reset()
self.viewer = None
self.steps_beyond_done = None
self._configure()
def __init__(self, gravity=9.8, masscart=1.0, masspole=0.1, length = .5, force_mag = 10.0):
self.gravity = gravity
self.masscart = masscart
self.masspole = masspole
self.total_mass = (self.masspole + self.masscart)
self.length = length # actually half the pole's length
self.polemass_length = (self.masspole * self.length)
self.force_mag = force_mag
self.tau = 0.02 # seconds between state updates
# Angle at which to fail the episode
self.theta_threshold_radians = 12 * 2 * math.pi / 360
self.x_threshold = 2.4
# Angle limit set to 2 * theta_threshold_radians so failing observation is still within bounds
high = np.array([
self.x_threshold * 2,
np.finfo(np.float32).max,
self.theta_threshold_radians * 2,
np.finfo(np.float32).max])
self.action_space = spaces.Discrete(2)
self.observation_space = spaces.Box(-high, high)
self._seed()
self.viewer = None
self.state = None
self.steps_beyond_done = None
def __init__(self):
pygame.init()
self._seed()
self.viewer = None
self.world = Box2D.b2World()
self.sea_surface = None
self.falcon_rocket = None
self.floating_drone_ship = None
self.particles = []
self.prev_reward = None
high = np.array([np.inf]*8) # useful range is -1 .. +1, but spikes can be higher
self.observation_space = spaces.Box(-high, high)
if self.continuous:
# Action is two floats [main engine, left-right engines].
# Main engine: -1..0 off, 0..+1 throttle from 50% to 100% power. Engine can't work with less than 50% power.
# Left-right: -1.0..-0.5 fire left engine, +0.5..+1.0 fire right engine, -0.5..0.5 off
self.action_space = spaces.Box(-1, +1, (2,))
else:
# Nop, fire left engine, main engine, right engine
self.action_space = spaces.Discrete(4)
self._reset()
def __init__(self):
self.observation_space = spaces.Discrete(NUM_CLASSES)
self.action_space = spaces.Tuple(
tuple(spaces.Discrete(2) for _ in range(NUM_CLASSES))
)
# Total number of notes
self.num_notes = 32
self.key = C_MAJOR_KEY
def __init__(self):
super().__init__()
self.lua_interface_path = os.path.join(package_directory, '../lua/soccer.lua')
self.rom_file_path = os.path.join(package_directory, '../roms/soccer.nes')
self.actions = [
'R', 'UR', 'DR',
'B', 'URB', 'DRB', 'RB',
'AB', 'RAB', 'URAB', 'DRAB'
]
self.action_space = spaces.Discrete(len(self.actions))
def __init__(self, **kwargs):
utils.EzPickle.__init__(self)
self.curr_seed = 0
self.screen = np.zeros((SCREEN_HEIGHT, SCREEN_WIDTH, 3), dtype=np.uint8)
self.closed = False
self.can_send_command = True
self.command_cond = Condition()
self.viewer = None
self.reward = 0
episode_time_length_secs = 7
frame_skip = 5
fps = 60
self.episode_length = episode_time_length_secs * fps / frame_skip
self.actions = [
'U', 'D', 'L', 'R',
'UR', 'DR', 'URA', 'DRB',
'A', 'B', 'RB', 'RA']
self.action_space = spaces.Discrete(len(self.actions))
self.frame = 0
# for communication with emulator
self.pipe_in = None
self.pipe_out = None
self.thread_incoming = None
self.rom_file_path = None
self.lua_interface_path = None
self.emulator_started = False
## ---------- gym.Env methods -------------
def __init__(self, nS, nA, P, isd):
self.P = P
self.isd = isd
self.lastaction=None # for rendering
self.nS = nS
self.nA = nA
self.action_space = spaces.Discrete(self.nA)
self.observation_space = spaces.Discrete(self.nS)
self._seed()
self._reset()
def __init__(self, natural=False):
self.action_space = spaces.Discrete(2)
self.observation_space = spaces.Tuple((
spaces.Discrete(32),
spaces.Discrete(11),
spaces.Discrete(2)))
self._seed()
# Flag to payout 1.5 on a "natural" blackjack win, like casino rules
# Ref: http://www.bicyclecards.com/how-to-play/blackjack/
self.natural = natural
# Start the first game
self._reset() # Number of
self.nA = 2
def __init__(self):
super(ConstantEnv, self).__init__()
self.action_space = spaces.Discrete(2)
self.observation_space = spaces.Discrete(2)
def __init__(self):
super(RandomInputConstantGoalEnv, self).__init__()
self.action_space = spaces.Discrete(2)
self.observation_space = spaces.Discrete(2)
def __init__(self):
super(DependentEnv, self).__init__()
self.action_space = spaces.Discrete(2)
self.observation_space = spaces.Discrete(2)
def space_shape(space):
"""return the shape of tensor expected for a given space"""
if isinstance(space, spaces.Discrete):
return [space.n]
else:
return space.shape
def __init__(self, env, keys):
super(DiscreteToFixedKeysVNCActions, self).__init__(env)
self._keys = keys
self._generate_actions()
self.action_space = spaces.Discrete(len(self._actions))
self.key_state = FixedKeyState(keys)
def __init__(self, env, actions):
super().__init__(env)
acsp = self.env.action_space
assert isinstance(acsp, Box), "action space not continuous"
self.actions = np.array(actions)
assert self.actions.shape[1:] == acsp.shape, "shape of actions does not match action space"
self.action_space = Discrete(self.actions.shape[0])
def __init__(self, sha256list, random_sample=True, maxturns=3, output_path='evaded/blackbox/', cache=False):
self.cache = cache
self.available_sha256 = sha256list
self.action_space = spaces.Discrete(len(ACTION_LOOKUP))
self.maxturns = maxturns
self.feature_extractor = pefeatures.PEFeatureExtractor()
self.random_sample = random_sample
self.sample_iteration_index = 0
self.output_path = os.path.join(
os.path.dirname(
os.path.dirname(
os.path.dirname(
os.path.abspath(__file__)))), output_path)
if not os.path.exists(output_path):
os.makedirs(output_path)
self.history = OrderedDict()
self.samples = {}
if self.cache:
for sha256 in self.available_sha256:
try:
self.samples[sha256] = interface.fetch_file(self.sha256)
except interface.FileRetrievalFailure:
print("failed fetching file")
continue # try a new sha256...this one can't be retrieved from storage
self._reset()
def __init__(self, sha256list, random_sample=True, maxturns=3, output_path='evaded/score/', cache=False):
self.cache = cache
self.available_sha256 = sha256list
self.action_space = spaces.Discrete(len(ACTION_LOOKUP))
self.maxturns = maxturns
self.feature_extractor = pefeatures.PEFeatureExtractor()
self.random_sample = random_sample
self.sample_iteration_index = 0
self.output_path = os.path.join(
os.path.dirname(
os.path.dirname(
os.path.dirname(
os.path.abspath(__file__)))), output_path)
if not os.path.exists(output_path):
os.makedirs(output_path)
self.history = OrderedDict()
self.samples = {}
if self.cache:
for sha256 in self.available_sha256:
try:
self.bytez = interface.fetch_file(self.sha256)
except interface.FileRetrievalFailure:
print("failed fetching file")
continue # try a new sha256...this one can't be retrieved from storage
self._reset() # self.original_score, self.bytez and self.observation_space get set here