Newer
Older
"""This module implements the Anti-Poaching game environment with
M poachers and N rangers.
:class:`raw_env` implements APE as a PettingZoo environment, but it is recommended
to use :meth:`parallel_env` to obtain a fully formed environment. This is because
:meth:`parallel_env` handles the creation of the :class:`utils.game_utils.BaseGridState`
object that encapsulates the game state and is internally used by the APE instance.
"""
Maddila Siva Sri Prasanna
committed
from copy import deepcopy
Maddila Siva Sri Prasanna
committed
import gymnasium as gym
Maddila Siva Sri Prasanna
committed
from .utils.game_utils import BaseGridState, GridStateConstProb, Trap
from .utils.typing import *
Maddila Siva Sri Prasanna
committed
# Game metadata as global
metadata = {
Maddila Siva Sri Prasanna
committed
"name": "anti_poaching_v0.3",
Maddila Siva Sri Prasanna
committed
"render_modes": BaseGridState.RENDER_MODES,
"is_parallelizable": True,
}
def parallel_env(
grid_size: int = 10,
nrangers: int = 2,
npoachers: int = 2,
ntraps_per_poacher: int = 3,
prob_detect_cell: int = 0.2,
prob_animal_appear: float = 0.2,
prob_detect_trap: float = 0.2,
max_time: int = 200,
seed: int = None,
render_mode: str = "ansi",
grid_class: BaseGridState = GridStateConstProb,
Maddila Siva Sri Prasanna
committed
) -> ParallelEnv:
"""Factory function to instantiate an AntiPoachingGame.
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
:param grid_size: The size of the square grid to use. Defaults to 10.
:type grid_size: int, optional
:param nrangers: The number of ranger agents in this instance.
Defaults to 2.
:type grid_size: int, optional
:param npoachers: The number of poacher agents in this instance.
Defaults to 2.
:type grid_size: int, optional
:param ntraps_per_poacher: The number of traps each poacher starts the
game with. Defaults to 3.
:type grid_size: int, optional
:param prob_detect_cell: Probability that an agent detects another agent.
Defaults to 0.2.
:type prob_detect_cell: float, optional
:param prob_animal_appear: Probability that an animal is captured in a
trap. Defaults to 0.2.
:type prob_animal_appear: float, optional
:param prob_detect_trap: Probability that an agent detects a trap. Defaults
to 0.2.
:type prob_detect_trap: float, optional
:param max_time: The horizon of the game, or the timestep at which
the game is automatically terminated. Defaults to 200.
:type max_time: int, optional
:param seed: The seed of the environment. Required to reproduce the same
environment. Note that the `default_rng` is used to generate seeds
for the locations, action and observations spaces of each agent using
the :meth:`raw_env.tap` method. `None` by default i.e. unseeded.
:type seed: int, optional
:param render_mode: Render mode to visualise the environment. Can be `rgb`
or `ansi` (default).
:type render_mode: str, optional
:param grid_class: Grid Class to use to store the game state. This can be
changed to use changing prob_animal_appear environments. Currently,
supported values are :class:`utils.game_utils.GridStateConstProb` (default)
and :class:`utils.game_utils.GridStateVaryingPCA` (default). New
environments must use the :class:`utils.game_utils.BaseGridState` interface
:type render_mode: BaseGridState, optional
:return: Initialised APE environment.
:rtype: raw_env
"""
Maddila Siva Sri Prasanna
committed
# Create the agents
rangers = [f"ranger_{i}" for i in range(nrangers)]
poachers = [f"poacher_{i}" for i in range(npoachers)]
# Create the grid-object that stores the state
grid = grid_class(
grid_size=grid_size,
rangers=rangers,
poachers=poachers,
ntraps_per_poacher=ntraps_per_poacher,
prob_animal_appear=prob_animal_appear,
prob_detect_cell=prob_detect_cell,
prob_detect_trap=prob_detect_trap,
Maddila Siva Sri Prasanna
committed
seed=seed,
render_mode=render_mode,
Maddila Siva Sri Prasanna
committed
)
return raw_env(
grid,
rangers,
poachers,
ntraps_per_poacher,
max_time,
seed,
)
class raw_env(ParallelEnv):
"""This implements APE as a PettingZoo ParallelEnv environment.
:param grid: This is the :class:`utils.game_utils.BaseGridState` object that
handles the game state.
:type grid: BaseGridState
:param rangers: The list of Ranger IDs to use for the game.
:type rangers: list[str]
:param poachers: The list of Poacher IDs to use for the game.
:type poachers: list[str]
:param max_time: The maximum number of timesteps before the game is
automatically terminated.
:type poachers: int
:param seed: The seed to initialise `self.rng` object. Note that
this will be used to seed all action/observation spaces, as well
as the initial locations of the agents in `self.grid`.
:type poachers: int, optional
"""Taps into the random number generator to give a random integer.
This is used to generate new pseudo-random seeds for the spaces when reset.
:return: An integer taken from the `self.rng` object.
:rtype: int
return int(self.rng.integers(1, 1e6, 1))
Maddila Siva Sri Prasanna
committed
grid: BaseGridState,
Maddila Siva Sri Prasanna
committed
ntraps_per_poacher: int,
max_time: int,
Maddila Siva Sri Prasanna
committed
seed: int = None,
Maddila Siva Sri Prasanna
committed
self.seed = seed
self.rng = np.random.default_rng(seed=seed)
# time properties
self.max_time = max_time
self.curr_time = 0
# agent parameters
self.ntraps_per_poacher = ntraps_per_poacher
Maddila Siva Sri Prasanna
committed
# agent-related properties
Maddila Siva Sri Prasanna
committed
self.poachers = poachers
self.rangers = rangers
self.agents = self.rangers + self.poachers
self.possible_agents = self.agents[:]
self.poacher_traps = {
poacher: [
Trap(name=f"trap_{i}_{poacher}")
for i in range(self.ntraps_per_poacher)
]
for poacher in self.poachers
}
# Arena-related properties
Maddila Siva Sri Prasanna
committed
self.grid = grid
Maddila Siva Sri Prasanna
committed
nrangers, npoachers = len(rangers), len(poachers)
# Convenience attributes
self._ranger_obs_size = 8 + nrangers
self._poacher_obs_size = 7
# Spaces parameters.
self.action_spaces = {
Maddila Siva Sri Prasanna
committed
**{
ranger: gym.spaces.Discrete(5, seed=self.tap())
Maddila Siva Sri Prasanna
committed
for ranger in self.rangers
},
**{
poacher: gym.spaces.Discrete(6, seed=self.tap())
Maddila Siva Sri Prasanna
committed
for poacher in self.poachers
},
Maddila Siva Sri Prasanna
committed
Maddila Siva Sri Prasanna
committed
ranger: gym.spaces.Dict(
np.zeros(self._ranger_obs_size),
Maddila Siva Sri Prasanna
committed
np.array( # high
[
max_time, # max time
*[self.grid.N] * 2, # location
*[1] * nrangers, # partner rangers
npoachers, # #captured-poachers
*[
ntraps_per_poacher * npoachers,
np.iinfo(INTEGER).max,
], # poacher-captured traps
*[
ntraps_per_poacher * npoachers,
np.iinfo(INTEGER).max,
], # grid-captured traps
]
),
Maddila Siva Sri Prasanna
committed
dtype=INTEGER,
Maddila Siva Sri Prasanna
committed
"action_mask": gym.spaces.MultiBinary(5),
}
Maddila Siva Sri Prasanna
committed
for ranger in self.rangers
Maddila Siva Sri Prasanna
committed
poacher: gym.spaces.Dict(
Maddila Siva Sri Prasanna
committed
np.array([0, *[-1] * 2, *[0] * 2, 0, 0]),
np.array(
[
max_time, # max time
*[self.grid.N] * 2, # location
ntraps_per_poacher, # #traps
np.iinfo(INTEGER).max, # #prey
Maddila Siva Sri Prasanna
committed
nrangers, # #rangers detected
npoachers, # #poachers detected
]
),
Maddila Siva Sri Prasanna
committed
dtype=INTEGER,
Maddila Siva Sri Prasanna
committed
),
Maddila Siva Sri Prasanna
committed
"action_mask": gym.spaces.MultiBinary(6),
}
Maddila Siva Sri Prasanna
committed
for poacher in self.poachers
def reset(self, seed: int = None, options: dict = None) -> tuple:
Maddila Siva Sri Prasanna
committed
"""Resets the environment for the next episode. If new configurations
are to be set, then we use the options dictionary as follows.
Maddila Siva Sri Prasanna
committed
Maddila Siva Sri Prasanna
committed
>>> env.reset(seed=123)
Maddila Siva Sri Prasanna
committed
Maddila Siva Sri Prasanna
committed
Here, a None `seed` means that the internal RNG of our GridState object
is randomly reset, and thus will generate a new starting position.
:param seed: The seed value to be used. `None` by default.
:type seed: int, optional
:param options: Options that will be forwarded to :meth:`BaseGridState.reset`\
for the reset. Currently for forward compatibility only.
:type options: dict, optional
"""
Maddila Siva Sri Prasanna
committed
self.rng = np.random.default_rng(seed=seed)
Maddila Siva Sri Prasanna
committed
# Reset the game parameters: Only override if supplied.
Maddila Siva Sri Prasanna
committed
self.curr_time = 0
Maddila Siva Sri Prasanna
committed
seed = seed if seed else self.seed
options = options if options is not None else {}
Maddila Siva Sri Prasanna
committed
# pass options to grid reset:
self.grid.reset(seed=seed, **options)
Maddila Siva Sri Prasanna
committed
# Regenerate agents.
self.agents = self.possible_agents[:]
self.poacher_traps = {
poacher: [
Trap(name=f"trap_{i}_{poacher}")
for i in range(self.ntraps_per_poacher)
]
for poacher in self.poachers
}
Maddila Siva Sri Prasanna
committed
Maddila Siva Sri Prasanna
committed
obs = dict.fromkeys(self.agents) # Returning this
Maddila Siva Sri Prasanna
committed
Maddila Siva Sri Prasanna
committed
# Creating a default object to copy for both agents
_def_ranger_obs = self.observation_space("ranger_0").sample()
_def_poacher_obs = self.observation_space("poacher_0").sample()
# and zero-ing them out since most init obs are zero.
_def_ranger_obs["observations"] = np.zeros_like(
_def_ranger_obs["observations"], dtype=INTEGER
Maddila Siva Sri Prasanna
committed
)
_def_poacher_obs["observations"] = np.zeros_like(
_def_poacher_obs["observations"], dtype=INTEGER
Maddila Siva Sri Prasanna
committed
)
for agent in self.agents:
# Re-seed/reinitialise all spaces
self.observation_space(agent).seed(self.tap())
self.action_space(agent).seed(self.tap())
Maddila Siva Sri Prasanna
committed
# create appropriate observations for t=0
_copy_obj, _size = (
(_def_ranger_obs, 3)
Maddila Siva Sri Prasanna
committed
else (_def_poacher_obs, 5)
)
obs[agent] = deepcopy(_copy_obj)
obs[agent]["observations"][0] = self.max_time
obs[agent]["observations"][1:_size] = self.grid.state[agent]
Maddila Siva Sri Prasanna
committed
obs[agent]["action_mask"] = self.grid.permitted_movements(agent)
Maddila Siva Sri Prasanna
committed
# returning infos dictionary with reset
return obs, dict.fromkeys(self.agents, {self.curr_time})
"""Receives a joint action, and sends the rewards and observations
for the next state.
:param actions: Dictionary mapping agent IDs to legal actions for
this step.
:type actions: dict
:return: A tuple of dictionaries over the agent IDs. These are the
Observations, Rewards, Terminated statuses, Truncated statuses
and Infos.
:rtype: tuple[dict]
"""
self.curr_time += 1
# creating local objects ...
rewards = dict.fromkeys(self.agents, 0.0)
terminations = dict.fromkeys(self.agents, False)
Maddila Siva Sri Prasanna
committed
truncations = dict.fromkeys(self.agents, False)
infos = dict.fromkeys(self.agents, {})
# ... and the obs dict, with dummy action masks for now.
Maddila Siva Sri Prasanna
committed
obs = dict.fromkeys(self.agents)
_size = (
self._ranger_obs_size
if "ranger" in agent
else self._poacher_obs_size
)
obs[agent] = {
"observations": np.zeros(_size, dtype=INTEGER),
"action_mask": None,
}
# Now we run through the transitions ! The obs
# dictionary is populated by each helper function.
Maddila Siva Sri Prasanna
committed
self._rangers_move(actions, obs)
# Step 2: Poachers move and remove their traps
Maddila Siva Sri Prasanna
committed
self._poachers_move_and_get_traps(actions, rewards, obs)
# Step 3: Rangers remove traps and remaining traps capture animals
Maddila Siva Sri Prasanna
committed
self._rangers_remove_traps(rewards, obs)
self._traps_catch_animals()
# Step 4: Rangers remove poachers and remaining poachers place traps
Maddila Siva Sri Prasanna
committed
self._rangers_remove_poachers(rewards, obs)
self._poachers_place_traps(actions)
# update the obs for the agents with time, new state and action masks
time_status = self.curr_time >= self.max_time
for agent in self.agents:
obs[agent]["observations"][0] = self.max_time - self.curr_time
if "ranger" in agent:
obs[agent]["observations"][1:3] = self.grid.state[agent]
obs[agent]["action_mask"] = self.grid.permitted_movements(
agent
)
elif "poacher" in agent:
_action_mask = self.grid.permitted_movements(agent)
_action_mask[5] = int(len(self.poacher_traps[agent]) > 0)
obs[agent]["observations"][1:5] = self.grid.state[agent]
obs[agent]["action_mask"] = _action_mask
# update terminations for next step. Note that all agents are
# technically alive until max_time: captured poachers are just
# in a captured state.
Maddila Siva Sri Prasanna
committed
terminations[agent] |= time_status
# Agents are terminated on the last step.
self.agents = [] if time_status else self.agents
Maddila Siva Sri Prasanna
committed
return obs, rewards, terminations, truncations, infos
"""This method forwards the render call to `self.grid` i.e.
it calls `self.grid.render()`, which will then use the
chosen `render_mode` to render the game.
"""
Maddila Siva Sri Prasanna
committed
self.grid.render()
"""Returns `self.grid`'s internal state representation.
:return: Dictionary representation of the state. This maps
agent IDs to their locations + traps/prey status, if any,
and the locations of all active traps on the grid.
:rtype: dict
"""
return self.grid.state
def _assign_reward(
self, poacher: AgentID, reward: float, rewards: dict
) -> None:
Maddila Siva Sri Prasanna
committed
"""Assigns the reward to poacher, and splits the reward among the
cooperative rangers. A positive reward adds to poacher and
Maddila Siva Sri Prasanna
committed
removes proportionally from all rangers."""
Maddila Siva Sri Prasanna
committed
rewards[poacher] += reward
for ranger in self.rangers:
rewards[ranger] -= reward / len(self.rangers)
Maddila Siva Sri Prasanna
committed
def _rangers_move(self, actions: dict, obs: dict) -> None:
"""Helper function to move the rangers and update their obs
Maddila Siva Sri Prasanna
committed
with detected partners in the same cell."""
for ranger in [r for r in self.rangers if 1 <= actions[r] <= 4]:
self.grid.update_position(ranger, actions[ranger])
for nbor_ranger in [
r
for r in self.grid.get_neighbours(ranger)
if r != ranger and "ranger" in r
Maddila Siva Sri Prasanna
committed
# add the ranger number to the obs
obs[ranger]["observations"][
2 + int(nbor_ranger.split("_")[-1])
] = 1
Maddila Siva Sri Prasanna
committed
self, actions: dict, rewards: dict, obs: dict
Maddila Siva Sri Prasanna
committed
"""Helper function: Moves the poachers according to actions
and removes their traps (if found) on the next step."""
for poacher in [
p
for p in self.poachers
if 0 <= actions[p] <= 4 and self.grid.state[p][0] >= 0
# Skipping over captured poachers ...
self.grid.update_position(poacher, actions[poacher])
for _trap in [
_t
for _t in self.grid.state
if isinstance(_t, Trap)
and all(self.grid.state[_t] == self.grid.state[poacher][0:2])
and poacher in _t.name
]:
Maddila Siva Sri Prasanna
committed
# Note: poacher obs will contain the new state anyway,
# and will be updated at the end of the transition. So,
# do not do a double write ==> write only to state now.
Maddila Siva Sri Prasanna
committed
self.grid.state[poacher][2:] += (1, _trap.value)
Maddila Siva Sri Prasanna
committed
Maddila Siva Sri Prasanna
committed
# Assign reward for each trap recovered, and empty it.
self._assign_reward(
poacher, self.grid.remove_trap(_trap), rewards
)
Maddila Siva Sri Prasanna
committed
_trap.value = 0
Maddila Siva Sri Prasanna
committed
# Update obs for detected agents as well
_loc = self.grid.state[poacher][:2]
Maddila Siva Sri Prasanna
committed
if self.rng.random() < self.grid.prob_detect_cell(_loc):
key = 5 if "ranger" in nbor else 6
obs[poacher]["observations"][key] += 1
Maddila Siva Sri Prasanna
committed
def _rangers_remove_traps(self, rewards: dict, obs: dict):
Maddila Siva Sri Prasanna
committed
"""Helper function where rangers detect and
Maddila Siva Sri Prasanna
committed
Note that detection depends on self.prob_detect_trap."""
Maddila Siva Sri Prasanna
committed
# First all agents detect traps.
traps_detected = set() # Multiple agents can detect same trap
for _trap in [_t for _t in self.grid.state if isinstance(_t, Trap)]:
_loc = self.grid.state[_trap] # also the nbor.rangers locations
Maddila Siva Sri Prasanna
committed
for ranger in self.grid.get_neighbours(_trap):
Maddila Siva Sri Prasanna
committed
if self.rng.random() < self.grid.prob_detect_trap(_loc):
Maddila Siva Sri Prasanna
committed
traps_detected.add(_trap)
for _trap in traps_detected:
# Extract the owning poacher name
_poacher = "_".join(_trap.name.split("_")[-2:])
Maddila Siva Sri Prasanna
committed
# updating the obs for all implicated rangers
# If trap value is zero(before reset/capture, it was empty)
key = -2 if _trap.value == 0 else -1
for ranger in [
_r for _r in self.grid.get_neighbours(_trap) if "ranger" in _r
]:
obs[ranger]["observations"][key] += 1
Maddila Siva Sri Prasanna
committed
# Assign rewards to Rangers. Includes trap removal logic.
Maddila Siva Sri Prasanna
committed
self._assign_reward(
_poacher, -self.grid.remove_trap(_trap), rewards
)
def _rangers_remove_poachers(self, rewards: dict, obs: dict):
"""Helper function where rangers detect and remove poachers in their
current cell. Detection depends on self.prob_detect_cell"""
Maddila Siva Sri Prasanna
committed
# First mark all captured poachers
caught_poachers = set()
Maddila Siva Sri Prasanna
committed
for _poacher in self.poachers:
if self.grid.state[_poacher][1] < 0:
Maddila Siva Sri Prasanna
committed
continue # Agent is already caught, skip
_loc = self.grid.state[_poacher]
for _ranger in [
Maddila Siva Sri Prasanna
committed
_r
for _r in self.grid.get_neighbours(_poacher)
if "ranger" in _r
and self.rng.random() < self.grid.prob_detect_cell(_loc)
caught_poachers.add(_poacher) # Poacher detected.
Maddila Siva Sri Prasanna
committed
Maddila Siva Sri Prasanna
committed
# update their status, and all ranger obs
Maddila Siva Sri Prasanna
committed
for _poacher in caught_poachers:
penalty = (
self.grid.remove_poacher(_poacher) # C_capture
+ self.grid.state[_poacher][-1] # C_prey
* self.grid.REWARD_MAP["PREY_FOUND"]
+ self.grid.state[_poacher][-2] # C_trap
* self.grid.REWARD_MAP["TRAP_FOUND"]
self._assign_reward(_poacher, -penalty, rewards)
Maddila Siva Sri Prasanna
committed
# Update the trap and prey status for implicated rangers:
# First increment #of caught poachers, then #traps/prey captured.
Maddila Siva Sri Prasanna
committed
for _ranger in [
_r
for _r in self.grid.get_neighbours(_poacher)
if "ranger" in _r
]:
obs[_ranger]["observations"][-5] += 1
obs[_ranger]["observations"][-4:-2] += self.grid.state[
Maddila Siva Sri Prasanna
committed
_poacher
Maddila Siva Sri Prasanna
committed
def _poachers_place_traps(self, actions: dict):
"""Helper function where poachers place traps. Note that this will not
succeed if poacher has no traps to place."""
for poacher in [
p
for p in self.poachers
if self.grid.state[p][1] >= 0 and actions[p] == 5
trap = self.poacher_traps[poacher].pop() # Will throw if empty.
self.grid.add_trap(trap, self.grid.state[poacher][0:2])
self.grid.state[poacher][2] -= 1
assert self.grid.state[poacher][2] == len(
self.poacher_traps[poacher]
Maddila Siva Sri Prasanna
committed
), f"Trap status out of sync for {poacher}"
Maddila Siva Sri Prasanna
committed
"""Helper function where currently active
traps catch prey if they have not already."""
for trap in [t for t in self.grid.state if isinstance(t, Trap)]:
Maddila Siva Sri Prasanna
committed
_loc = self.grid.state[trap]
if (
Maddila Siva Sri Prasanna
committed
trap.value == 0
and self.rng.random() < self.grid.prob_animal_appear(_loc)
Maddila Siva Sri Prasanna
committed
):
@functools.lru_cache(maxsize=None)
def observation_space(self, agent):
"""Return an agent's observation space. The observations are vectors
with different lower and upper bounds. The older implementation used
dictionaries, which favored readibility over usability. We also assume
that a poacher can detect the number of their own traps in the
current cell with probability 1.
:param agent: The agent ID for which to get the observation space.
:type agent: str
:return: The observation space (a :class:`gym.spaces.Box` instance)
corresponding to `agent`.
:rtype: :class:`gymnasium.spaces.Box`
"""
return self.observation_spaces[agent]
@functools.lru_cache(maxsize=None)
def action_space(self, agent):
"""Return an agent's action space. Note that we assume the
following conventions for the actions:
* 0-4 : NOOP, up, left, down, right
* 5 : (poachers only) place-trap
:param agent: The agent ID for which to get the action space.
:type agent: str
:return: The action space (a :class:`gym.spaces.Discrete` instance)
corresponding to `agent`.
:rtype: :class:`gymnasium.spaces.Discrete`
"""
return self.action_spaces[agent]