Skip to content
Snippets Groups Projects
anti_poaching.py 23.3 KiB
Newer Older
Maddila Siva Sri Prasanna's avatar
Maddila Siva Sri Prasanna committed
"""This module implements the Anti-Poaching game environment with 
M poachers and N rangers.

:class:`raw_env` implements APE as a PettingZoo environment, but it is recommended
to use :meth:`parallel_env` to obtain a fully formed environment. This is because
:meth:`parallel_env` handles the creation of the :class:`utils.game_utils.BaseGridState`
object that encapsulates the game state and is internally used by the APE instance.
"""
import functools
import numpy as np
from pettingzoo.utils.env import ParallelEnv
from .utils.game_utils import BaseGridState, GridStateConstProb, Trap
    "render_modes": BaseGridState.RENDER_MODES,
    "is_parallelizable": True,
}


def parallel_env(
    grid_size: int = 10,
    nrangers: int = 2,
    npoachers: int = 2,
    ntraps_per_poacher: int = 3,
    prob_detect_cell: int = 0.2,
    prob_animal_appear: float = 0.2,
    prob_detect_trap: float = 0.2,
    max_time: int = 200,
    seed: int = None,
    render_mode: str = "ansi",
    grid_class: BaseGridState = GridStateConstProb,
    """Factory function to instantiate an AntiPoachingGame.
Maddila Siva Sri Prasanna's avatar
Maddila Siva Sri Prasanna committed

    :param grid_size: The size of the square grid to use. Defaults to 10.
    :type grid_size: int, optional
    :param nrangers: The number of ranger agents in this instance.
        Defaults to 2.
    :type grid_size: int, optional
    :param npoachers: The number of poacher agents in this instance.
        Defaults to 2.
    :type grid_size: int, optional
    :param ntraps_per_poacher: The number of traps each poacher starts the
        game with. Defaults to 3.
    :type grid_size: int, optional
    :param prob_detect_cell: Probability that an agent detects another agent.
        Defaults to 0.2.
    :type prob_detect_cell: float, optional
    :param prob_animal_appear: Probability that an animal is captured in a
        trap. Defaults to 0.2.
    :type prob_animal_appear: float, optional
    :param prob_detect_trap: Probability that an agent detects a trap. Defaults
        to 0.2.
    :type prob_detect_trap: float, optional
    :param max_time: The horizon of the game, or the timestep at which
        the game is automatically terminated. Defaults to 200.
    :type max_time: int, optional
    :param seed: The seed of the environment. Required to reproduce the same
        environment. Note that the `default_rng` is used to generate seeds
        for the locations, action and observations spaces of each agent using
        the :meth:`raw_env.tap` method. `None` by default i.e. unseeded.
    :type seed: int, optional
    :param render_mode: Render mode to visualise the environment. Can be `rgb`
        or `ansi` (default).
    :type render_mode: str, optional
    :param grid_class: Grid Class to use to store the game state. This can be
        changed to use changing prob_animal_appear environments. Currently,
        supported values are :class:`utils.game_utils.GridStateConstProb` (default)
        and :class:`utils.game_utils.GridStateVaryingPCA` (default). New
        environments must use the :class:`utils.game_utils.BaseGridState` interface
    :type render_mode: BaseGridState, optional

    :return: Initialised APE environment.
    :rtype: raw_env
    """

    # Create the agents
    rangers = [f"ranger_{i}" for i in range(nrangers)]
    poachers = [f"poacher_{i}" for i in range(npoachers)]

    # Create the grid-object that stores the state
    grid = grid_class(
        grid_size=grid_size,
        rangers=rangers,
        poachers=poachers,
        ntraps_per_poacher=ntraps_per_poacher,
        prob_animal_appear=prob_animal_appear,
        prob_detect_cell=prob_detect_cell,
        prob_detect_trap=prob_detect_trap,
    )

    return raw_env(
        grid,
        rangers,
        poachers,
        ntraps_per_poacher,
        max_time,
        seed,
    )


class raw_env(ParallelEnv):
Maddila Siva Sri Prasanna's avatar
Maddila Siva Sri Prasanna committed
    """This implements APE as a PettingZoo ParallelEnv environment.

    :param grid: This is the :class:`utils.game_utils.BaseGridState` object that
        handles the game state.
    :type grid: BaseGridState
    :param rangers: The list of Ranger IDs to use for the game.
    :type rangers: list[str]
    :param poachers: The list of Poacher IDs to use for the game.
    :type poachers: list[str]
    :param max_time: The maximum number of timesteps before the game is
        automatically terminated.
    :type poachers: int
    :param seed: The seed to initialise `self.rng` object. Note that
        this will be used to seed all action/observation spaces, as well
        as the initial locations of the agents in `self.grid`.
    :type poachers: int, optional
    def tap(self) -> int:
Maddila Siva Sri Prasanna's avatar
Maddila Siva Sri Prasanna committed
        """Taps into the random number generator to give a random integer.
        This is used to generate new pseudo-random seeds for the spaces when reset.
Maddila Siva Sri Prasanna's avatar
Maddila Siva Sri Prasanna committed

        :return: An integer taken from the `self.rng` object.
        :rtype: int
Maddila Siva Sri Prasanna's avatar
Maddila Siva Sri Prasanna committed
        """
        return int(self.rng.integers(1, 1e6, 1))

    def __init__(
        self,
Maddila Siva Sri Prasanna's avatar
Maddila Siva Sri Prasanna committed
        rangers: list,
        poachers: list,
        self.rng = np.random.default_rng(seed=seed)
        # time properties
        self.max_time = max_time
        self.curr_time = 0

        # agent parameters
        self.ntraps_per_poacher = ntraps_per_poacher

        self.poachers = poachers
        self.rangers = rangers
        self.agents = self.rangers + self.poachers
        self.possible_agents = self.agents[:]
        self.poacher_traps = {
            poacher: [
                Trap(name=f"trap_{i}_{poacher}")
                for i in range(self.ntraps_per_poacher)
            ]
            for poacher in self.poachers
        }

        # Arena-related properties
        # Convenience attributes
        self._ranger_obs_size = 8 + nrangers
        self._poacher_obs_size = 7

        # Spaces parameters.
        self.action_spaces = {
                ranger: gym.spaces.Discrete(5, seed=self.tap())
                poacher: gym.spaces.Discrete(6, seed=self.tap())
        self.observation_spaces = {
            **{
                        "observations": gym.spaces.Box(
                            np.zeros(self._ranger_obs_size),
                            np.array(  # high
                                [
                                    max_time,  # max time
                                    *[self.grid.N] * 2,  # location
                                    *[1] * nrangers,  # partner rangers
                                    npoachers,  # #captured-poachers
                                    *[
                                        ntraps_per_poacher * npoachers,
                                        np.iinfo(INTEGER).max,
                                    ],  # poacher-captured traps
                                    *[
                                        ntraps_per_poacher * npoachers,
                                        np.iinfo(INTEGER).max,
                                    ],  # grid-captured traps
                                ]
                            ),
                            seed=self.tap(),
                        "observations": gym.spaces.Box(
                            np.array([0, *[-1] * 2, *[0] * 2, 0, 0]),
                            np.array(
                                [
                                    max_time,  # max time
                                    *[self.grid.N] * 2,  # location
                                    ntraps_per_poacher,  # #traps
                                    np.iinfo(INTEGER).max,  # #prey
                                    nrangers,  # #rangers detected
                                    npoachers,  # #poachers detected
                                ]
                            ),
                            seed=self.tap(),
Maddila Siva Sri Prasanna's avatar
Maddila Siva Sri Prasanna committed
    def reset(self, seed: int = None, options: dict = None) -> tuple:
        """Resets the environment for the next episode. If new configurations
        are to be set, then we use the options dictionary as follows.
Maddila Siva Sri Prasanna's avatar
Maddila Siva Sri Prasanna committed
        >>> env = parallel_env( ... )
        Here, a None `seed` means that the internal RNG of our GridState object
Maddila Siva Sri Prasanna's avatar
Maddila Siva Sri Prasanna committed
        is randomly reset, and thus will generate a new starting position.

        :param seed: The seed value to be used. `None` by default.
        :type seed: int, optional
        :param options: Options that will be forwarded to :meth:`BaseGridState.reset`\
                for the reset. Currently for forward compatibility only.
        :type options: dict, optional

        """
        # Reset the game parameters: Only override if supplied.
        seed = seed if seed else self.seed
        options = options if options is not None else {}
        self.agents = self.possible_agents[:]
        self.poacher_traps = {
            poacher: [
                Trap(name=f"trap_{i}_{poacher}")
                for i in range(self.ntraps_per_poacher)
            ]
            for poacher in self.poachers
        }
        # Creating a default object to copy for both agents
        _def_ranger_obs = self.observation_space("ranger_0").sample()
        _def_poacher_obs = self.observation_space("poacher_0").sample()
        # and zero-ing them out since most init obs are zero.
        _def_ranger_obs["observations"] = np.zeros_like(
            _def_ranger_obs["observations"], dtype=INTEGER
        _def_poacher_obs["observations"] = np.zeros_like(
            _def_poacher_obs["observations"], dtype=INTEGER
        )
        for agent in self.agents:
            # Re-seed/reinitialise all spaces
            self.observation_space(agent).seed(self.tap())
            self.action_space(agent).seed(self.tap())

            # create appropriate observations for t=0
            _copy_obj, _size = (
                (_def_ranger_obs, 3)
            obs[agent] = deepcopy(_copy_obj)
            obs[agent]["observations"][0] = self.max_time
            obs[agent]["observations"][1:_size] = self.grid.state[agent]
            obs[agent]["action_mask"] = self.grid.permitted_movements(agent)
        # returning infos dictionary with reset
        return obs, dict.fromkeys(self.agents, {self.curr_time})

    def step(self, actions: dict) -> tuple:
Maddila Siva Sri Prasanna's avatar
Maddila Siva Sri Prasanna committed
        """Receives a joint action, and sends the rewards and observations
        for the next state.

        :param actions: Dictionary mapping agent IDs to legal actions for
            this step.
        :type actions: dict
        :return: A tuple of dictionaries over the agent IDs. These are the
            Observations, Rewards, Terminated statuses, Truncated statuses
            and Infos.
        :rtype: tuple[dict]

        """
        self.curr_time += 1

        # creating local objects ...
        rewards = dict.fromkeys(self.agents, 0.0)
        terminations = dict.fromkeys(self.agents, False)
        truncations = dict.fromkeys(self.agents, False)
        infos = dict.fromkeys(self.agents, {})

        # ... and the obs dict, with dummy action masks for now.
        for agent in self.agents:
            _size = (
                self._ranger_obs_size
                else self._poacher_obs_size
                "observations": np.zeros(_size, dtype=INTEGER),
        # Now we run through the transitions ! The obs
        # dictionary is populated by each helper function.
        # Step 1: Rangers move first

        # Step 2: Poachers move and remove their traps
        self._poachers_move_and_get_traps(actions, rewards, obs)

        # Step 3: Rangers remove traps and remaining traps capture animals
        self._traps_catch_animals()

        # Step 4: Rangers remove poachers and remaining poachers place traps
        self._rangers_remove_poachers(rewards, obs)
        self._poachers_place_traps(actions)
        # update the obs for the agents with time, new state and action masks
        time_status = self.curr_time >= self.max_time
        for agent in self.agents:
            obs[agent]["observations"][0] = self.max_time - self.curr_time
                obs[agent]["observations"][1:3] = self.grid.state[agent]
                obs[agent]["action_mask"] = self.grid.permitted_movements(
                    agent
                )
            elif "poacher" in agent:
                _action_mask = self.grid.permitted_movements(agent)
                _action_mask[5] = int(len(self.poacher_traps[agent]) > 0)
                obs[agent]["observations"][1:5] = self.grid.state[agent]
                obs[agent]["action_mask"] = _action_mask

            # update terminations for next step. Note that all agents are
            # technically alive until max_time: captured poachers are just
            # in a captured state.
        # Agents are terminated on the last step.
        self.agents = [] if time_status else self.agents
        return obs, rewards, terminations, truncations, infos

    def render(self):
Maddila Siva Sri Prasanna's avatar
Maddila Siva Sri Prasanna committed
        """This method forwards the render call to `self.grid` i.e.
        it calls `self.grid.render()`, which will then use the
        chosen `render_mode` to render the game.
        """

    def state(self) -> dict:
Maddila Siva Sri Prasanna's avatar
Maddila Siva Sri Prasanna committed
        """Returns `self.grid`'s internal state representation.

        :return: Dictionary representation of the state. This maps
            agent IDs to their locations + traps/prey status, if any,
            and the locations of all active traps on the grid.
        :rtype: dict
        """
        return self.grid.state

    def _assign_reward(
        self, poacher: AgentID, reward: float, rewards: dict
    ) -> None:
        """Assigns the reward to poacher, and splits the reward among the
        cooperative rangers. A positive reward adds to poacher and
        for ranger in self.rangers:
            rewards[ranger] -= reward / len(self.rangers)

    def _rangers_move(self, actions: dict, obs: dict) -> None:
        """Helper function to move the rangers and update their obs
        for ranger in [r for r in self.rangers if 1 <= actions[r] <= 4]:
            self.grid.update_position(ranger, actions[ranger])
            for nbor_ranger in [
                r
                for r in self.grid.get_neighbours(ranger)
                if r != ranger and "ranger" in r
                obs[ranger]["observations"][
                    2 + int(nbor_ranger.split("_")[-1])
                ] = 1

    def _poachers_move_and_get_traps(
    ) -> None:
        """Helper function: Moves the poachers according to actions
        and removes their traps (if found) on the next step."""
        for poacher in [
            p
            for p in self.poachers
            if 0 <= actions[p] <= 4 and self.grid.state[p][0] >= 0
            # Skipping over captured poachers ...
            self.grid.update_position(poacher, actions[poacher])
            for _trap in [
                _t
                for _t in self.grid.state
                if isinstance(_t, Trap)
                and all(self.grid.state[_t] == self.grid.state[poacher][0:2])
                and poacher in _t.name
            ]:
Maddila Siva Sri Prasanna's avatar
Maddila Siva Sri Prasanna committed
                # update the state of poachers and assign reward.
                # Note: poacher obs will contain the new state anyway,
                # and will be updated at the end of the transition. So,
                # do not do a double write ==> write only to state now.
                self.grid.state[poacher][2:] += (1, _trap.value)
                self.poacher_traps[poacher].append(_trap)
                self._assign_reward(
                    poacher, self.grid.remove_trap(_trap), rewards
                )
            # Update obs for detected agents as well
            _loc = self.grid.state[poacher][:2]
            for nbor in self.grid.get_neighbours(poacher):
                if self.rng.random() < self.grid.prob_detect_cell(_loc):
                    key = 5 if "ranger" in nbor else 6
                    obs[poacher]["observations"][key] += 1
    def _rangers_remove_traps(self, rewards: dict, obs: dict):
        remove traps in their current cells.
        Note that detection depends on self.prob_detect_trap."""
        # First all agents detect traps.
        traps_detected = set()  # Multiple agents can detect same trap
        for _trap in [_t for _t in self.grid.state if isinstance(_t, Trap)]:
            _loc = self.grid.state[_trap]  # also the nbor.rangers locations
            for ranger in self.grid.get_neighbours(_trap):
                if self.rng.random() < self.grid.prob_detect_trap(_loc):
                    traps_detected.add(_trap)

        for _trap in traps_detected:
            # Extract the owning poacher name
            _poacher = "_".join(_trap.name.split("_")[-2:])

            # If trap value is zero(before reset/capture, it was empty)
            key = -2 if _trap.value == 0 else -1
Maddila Siva Sri Prasanna's avatar
Maddila Siva Sri Prasanna committed
            for ranger in [
                _r for _r in self.grid.get_neighbours(_trap) if "ranger" in _r
            ]:
                obs[ranger]["observations"][key] += 1
            # Assign rewards to Rangers. Includes trap removal logic.
            self._assign_reward(
                _poacher, -self.grid.remove_trap(_trap), rewards
            )
    def _rangers_remove_poachers(self, rewards: dict, obs: dict):
        """Helper function where rangers detect and remove poachers in their
        current cell. Detection depends on self.prob_detect_cell"""
        caught_poachers = set()
            if self.grid.state[_poacher][1] < 0:
            _loc = self.grid.state[_poacher]
            for _ranger in [
                _r
                for _r in self.grid.get_neighbours(_poacher)
                if "ranger" in _r
                and self.rng.random() < self.grid.prob_detect_cell(_loc)
                caught_poachers.add(_poacher)  # Poacher detected.
                self.grid.remove_poacher(_poacher)  # C_capture
                + self.grid.state[_poacher][-1]  # C_prey
                * self.grid.REWARD_MAP["PREY_FOUND"]
                + self.grid.state[_poacher][-2]  # C_trap
                * self.grid.REWARD_MAP["TRAP_FOUND"]
            self._assign_reward(_poacher, -penalty, rewards)
            # Update the trap and prey status for implicated rangers:
            # First increment #of caught poachers, then #traps/prey captured.
            for _ranger in [
                _r
                for _r in self.grid.get_neighbours(_poacher)
                if "ranger" in _r
            ]:
                obs[_ranger]["observations"][-5] += 1
                obs[_ranger]["observations"][-4:-2] += self.grid.state[
    def _poachers_place_traps(self, actions: dict):
        """Helper function where poachers place traps. Note that this will not
        succeed if poacher has no traps to place."""
        for poacher in [
            p
            for p in self.poachers
            if self.grid.state[p][1] >= 0 and actions[p] == 5
            trap = self.poacher_traps[poacher].pop()  # Will throw if empty.
            self.grid.add_trap(trap, self.grid.state[poacher][0:2])
            self.grid.state[poacher][2] -= 1
            assert self.grid.state[poacher][2] == len(
                self.poacher_traps[poacher]
            ), f"Trap status out of sync for {poacher}"

    def _traps_catch_animals(self):
        """Helper function where currently active
        traps catch prey if they have not already."""
        for trap in [t for t in self.grid.state if isinstance(t, Trap)]:
                trap.value == 0
                and self.rng.random() < self.grid.prob_animal_appear(_loc)
    @functools.lru_cache(maxsize=None)
    def observation_space(self, agent):
Maddila Siva Sri Prasanna's avatar
Maddila Siva Sri Prasanna committed
        """Return an agent's observation space. The observations are vectors
        with different lower and upper bounds. The older implementation used
        dictionaries, which favored readibility over usability. We also assume
        that a poacher can detect the number of their own traps in the
        current cell with probability 1.

        :param agent: The agent ID for which to get the observation space.
        :type agent: str
        :return: The observation space (a :class:`gym.spaces.Box` instance)
            corresponding to `agent`.
        :rtype: :class:`gymnasium.spaces.Box`
        """
        return self.observation_spaces[agent]

    @functools.lru_cache(maxsize=None)
    def action_space(self, agent):
Maddila Siva Sri Prasanna's avatar
Maddila Siva Sri Prasanna committed
        """Return an agent's action space. Note that we assume the
        following conventions for the actions:

           * 0-4 : NOOP, up, left, down, right
           * 5   : (poachers only) place-trap

        :param agent: The agent ID for which to get the action space.
        :type agent: str
        :return: The action space (a :class:`gym.spaces.Discrete` instance)
            corresponding to `agent`.
        :rtype: :class:`gymnasium.spaces.Discrete`

        """
        return self.action_spaces[agent]