Foraging toolkit demo - communicating foragers, Part I (simulation)

Outline

Introduction

In a multi-agent context, communication of information between foragers is an important feature of group-level behavior, as is the impact of different environmental conditions. We thus explore how one might infer to what extent agents communicate with each other to facilitate foraging. We ask whether the benefit of communicating information would be different for different environments, using multiple simulations with a range of communication-related hyper-parameters. In environments where food is highly clustered, it takes longer for birds to find food, but in all environments, using information communicated from other birds improves foraging success. The Bayesian inference methods are able to correctly compare the extent to which simulated agents communicate about the locations of the rewards.

The communicating foragers demo is divided into two notebooks:

  1. Simulation (this one)

  2. Inference communicators_inference.ipynb - proceed there after completing this notebook

The users are advised to read through the demo notebooks in docs/foraging/random-hungry-followers folder to get familiarized with the foraging toolkit.

The main reference is [1], in particular Fig.3.


[1] R. Urbaniak, M. Xie, and E. Mackevicius, “Linking cognitive strategy, neural mechanism, and movement statistics in group foraging behaviors,” Sci Rep, vol. 14, no. 1, p. 21770, Sep. 2024, doi: 10.1038/s41598-024-71931-0.

[1]:
import logging
import os
from itertools import product

import numpy as np
import pandas as pd
import plotly.io as pio

import collab.foraging.communicators as com
import collab.foraging.toolkit as ft
from collab.utils import find_repo_root

pio.renderers.default = "notebook"

repo_root = find_repo_root()

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")

# if you need to change the number of frames, replace 50 with your desired number

# this is an alternative continuous development setup
# for automated notebook testing
# feel free to ignore this
dev_mode = False  # set to True if you want to generate your own csvs
smoke_test = "CI" in os.environ
N_frames = 10 if smoke_test else 50

Simulation setup

We simulate grid world environments with food patches of varying degrees of spatial clustering, controlling for the total amount of food in the environment. In each environment, there were 12 total food items, distributed randomly in patches of size 1 \times 1,2 \times 2, or 4 \times 4.

We parameterize the extent to which agents share information about food locations. In these simulations, agents follow a policy to select action A_{\text{opt}}=\arg \max _A(V(T(A, S))), where V computes the expected future value based on an estimate of expected reward includes both food that they can directly observe (within a radius of 5 steps), as well as perceiving other birds eating at farther locations, and T is a state transition function. In the real world, this estimate could be achieved by observing other birds and/or listening to their calls. The weighting of social information (reward locations communicated by other birds), compared to individually observed information, is given by the communication parameter, (c_trust below) which ranges from 0 (no communication) to 1 (full reliance on social information).

As we shall see below, birds that communicate appear to navigate more directly to food locations than birds that search independently.

Here we define parameters of the forward model. We have two simulations, where c_trust was set either to 0 (no communication) or 0.6. We save the parameters for each simulation, as well as the meta-data for all simulations, in CSV files in the data directory.

[2]:
# Simulation setup 1 for the communication detection problem

home_dir_strong_sim = os.path.join(
    repo_root, "data/foraging/communicators/communicators_strong/"
)
# # agent parameters
sight_radius = [5]
c_trust = [0, 0.6]  # 0: ignorers
N_agents = 8

# # environment parameters
edge_size = 30
N_total_food_units = 16
reward_patch_dim = [4]  # clustered is 4, distributed is 1

# simulation parameters
N_runs = 1  # How many times would you like to run each case?
N_frames = N_frames

# Generate a dataframe containing all possible combinations of the parameter values specified above.
param_list = [i for i in product(c_trust, sight_radius, reward_patch_dim)]
metadataDF = pd.DataFrame(param_list)
metadataDF.columns = ["c_trust", "sight_radius", "reward_patch_dim"]
metadataDF["sim index"] = np.arange(len(metadataDF)).astype(int)
N_sims = len(metadataDF) if not smoke_test else 1

# save metadata to home directory
if dev_mode and not smoke_test:
    metadataDF.to_csv(os.path.join(home_dir_strong_sim, "metadataDF.csv"))
    pd.DataFrame(
        [
            {
                "N_sims": N_sims,
                "N_runs": N_runs,
                "N_frames": N_frames,
                "N_agents": N_agents,
                "N_total_food_units": N_total_food_units,
                "edge_size": edge_size,
            }
        ]
    ).to_csv(os.path.join(home_dir_strong_sim, "additional_meta_params.csv"))

display(metadataDF)

# Simulations set right,
# Before you start, keep in sight,
# Data safe from overwrite.
print(home_dir_strong_sim)
c_trust sight_radius reward_patch_dim sim index
0 0.0 5 4 0
1 0.6 5 4 1
/Users/dima/git/collab2/data/foraging/communicators/communicators_strong/

The simulation algorithm

The Successor Representation, used in RL models, is a predictive map that represents the temporally discounted expected occupancy of future states, from any starting state. In a world with N states, the Successor Representation is an N \mathrm{x} N matrix, defined as M=\sum_{t=0}^{\infty}(\gamma T)^t=(I-\gamma T)^{-1}, where T is the transition matrix between states and I is the identity matrix.

In the simulations below, N=N_x\times N_y is the number of grid locations. For i=1,\dots,N, let (x_i,y_i) denote state i. Let n(i) denote the number of possible neighboring states such that \forall j \in n(i) we have \operatorname{dist}((x_i,y_i),(x_j,y_j))\leqslant \text{maxStepSize} (below, \text{maxStepSize}=3). Then the transition matrix is defined as

T(i,j)=\begin{cases} \frac{1}{n(i)} & j \in n(i);\\ 0 & \text{else} \end{cases}.

Simulation of communicator foragers ([1, Algorithm 4, Appendix])

  • For each run:

    • INITIALIZATION

      • Initialize grid environment with N_{\text{states}} states

      • Add random food patches to the environment

      • Initialize foragers:

        • Assign each forager b to a random state S_{t=0}^b on the grid

        • Assign the same values for c_{\text{trust}} and sight radius to all foragers

      • Initialize a N_{\text{states}}-length vector \vec{\phi}_{\text{agents}} which indicates the number of foragers at each state

    • SIMULATE (com.SimulateCommunicators)

      • For each frame:

        • Update rewards at each state based on rate of foragers’ calorie consumption: \vec{r}_S \leftarrow \vec{r}_S - \text{rate} \cdot \vec{\phi}_{\text{agents}}

        • If the total remaining food falls below a threshold, generate additional random food patches

        • For each forager b:

          • Compute vector \vec{\phi}_{\text{visible}} indicating which states are within this forager’s sight radius (euclidean distance < sight radius)

          • Update expected rewards (food calories) of states within the forager’s sight radius: \vec{w}_{\text{self}} \leftarrow \vec{r}_S \odot \vec{\phi}_{\text{visible}}

          • Move forager out of its old state: \vec{\phi}_{\text{agents}}[S_t^b] \leftarrow \vec{\phi}_{\text{agents}}[S_t^b] - 1

          • Update expected rewards (food calories) for states of other agents: \vec{w}_{\text{others}} \leftarrow \vec{r}_S \odot \vec{\phi}_{\text{agents}}

          • Update this forager’s estimate of the value of each state using reward expectation vectors and Successor Representation matrix: V(S) \leftarrow ((1-c_{\text{trust}}) \vec{w}_{\text{self}} + c_{\text{trust}} \vec{w}_{\text{others}})^T M

          • Make a vector containing the values of all states the forager could consider moving to (within the sight radius): V_{\text{eligible}}

          • Forager decides its next location: S_{t+1}^b \leftarrow \operatorname{argmax}[V_{\text{eligible}}]

          • Move forager to its new state: \vec{\phi}_{\text{agents}}[S_{t+1}^b] \leftarrow \vec{\phi}_{\text{agents}}[S_{t+1}^b] + 1

[3]:
def run_simulations(home_dir, fresh_start=True):
    if fresh_start:
        start = 0
    else:
        resultsDF = pd.read_csv(os.path.join(home_dir, "resultsDF.csv"))
        start = resultsDF.iloc[-1]["sim index"].astype(
            int
        )  # start with the last existing batch

        logging.info(f"Starting from batch {start+1}.")

    all_results = []

    for si in range(start, N_sims):
        # 1. pull out parameters from row si in the metadata
        df_row = metadataDF.iloc[[si]]
        c_trust = df_row["c_trust"].iloc[0]
        sight_radius = df_row["sight_radius"].iloc[0]
        reward_patch_dim = df_row["reward_patch_dim"].iloc[0].astype(int)

        # arrays to save success measures for each run of this simulation
        mean_times_to_first_reward = np.zeros((N_runs))
        num_foragers_failed = np.zeros((N_runs))

        logging.info(
            f"Starting simulation setting {si+1}/{N_sims}, about to run it {N_runs} times."
        )

        # Do multiple runs of the simulation and store the results in a results dataframe
        batch_results = []
        for ri in range(N_runs):
            # initialize environment
            env = com.Environment(
                edge_size=edge_size,
                N_total_food_units=N_total_food_units,
                patch_dim=reward_patch_dim,
            )
            env.add_food_patches()

            # run simulation
            sim = com.SimulateCommunicators(
                env, N_frames, N_agents, c_trust=c_trust, sight_radius=sight_radius
            )
            sim.run()

            # Compute success measures
            time_to_first_allforagers = np.zeros(N_agents)
            for forager_id in range(
                1, N_agents + 1
            ):  # compute time to first food for each forager
                singleforagerDF = sim.all_foragersDF.loc[
                    sim.all_foragersDF.forager == forager_id
                ]
                time_to_first_allforagers[forager_id - 1] = (
                    com.compute_time_to_first_reward(
                        singleforagerDF, sim.all_rewardsDF, N_frames
                    )
                )
            mean_times_to_first_reward = np.mean(
                time_to_first_allforagers
            )  # take the average across foragers
            num_foragers_failed = np.sum(
                time_to_first_allforagers == N_frames
            )  # number of foragers that failed to reach food

            # Save the simulation results in a folder named sim{si}_run{ri} in the home directory

            if dev_mode and not smoke_test:
                sim_folder = "sim" + str(si) + "_run" + str(ri)
                sim_dir = os.path.join(home_dir, sim_folder)
                if not os.path.isdir(sim_dir):
                    os.makedirs(sim_dir)
                sim.all_foragersDF.to_csv(os.path.join(sim_dir, "foragerlocsDF.csv"))
                sim.all_rewardsDF.to_csv(os.path.join(sim_dir, "rewardlocsDF.csv"))

            # Combine the metadata and the success measures for the results dataframe
            results_onesim = {
                "c_trust": c_trust,
                "sight_radius": sight_radius,
                "reward_patch_dim": reward_patch_dim,
                "sim index": si,
                "run index": ri,
                "time to first food": mean_times_to_first_reward,
                "num foragers failed": num_foragers_failed,
            }
            batch_results.append(results_onesim)

        batch_resultsDF = pd.DataFrame(batch_results)

        if "resultsDF" in locals():
            resultsDF = pd.concat(
                [resultsDF, batch_resultsDF], ignore_index=True, axis=0
            )
        else:
            resultsDF = batch_resultsDF.copy()

        if dev_mode and not smoke_test:
            resultsDF.to_csv(os.path.join(home_dir, "resultsDF.csv"))
            logging.info(f"Saved results for batch {si+1}/{N_sims}.")


run_simulations(home_dir_strong_sim, fresh_start=True)
2024-11-01 09:50:43,962 - Starting simulation setting 1/2, about to run it 1 times.
2024-11-01 09:51:30,410 - Starting simulation setting 2/2, about to run it 1 times.
[4]:
# load the data


def load_communicators(sim_folder):

    sim_dir = os.path.join(home_dir_strong_sim, sim_folder)
    foragerlocsDF = pd.read_csv(os.path.join(sim_dir, "foragerlocsDF.csv"), index_col=0)
    rewardlocsDF = pd.read_csv(os.path.join(sim_dir, "rewardlocsDF.csv"), index_col=0)

    # simulation time and grid coordinates start at 1, shift to 0s without modifying  the original simulations

    foragerlocsDF["forager"] = foragerlocsDF["forager"] - 1
    foragerlocsDF["time"] = foragerlocsDF["time"] - 1
    rewardlocsDF["time"] = rewardlocsDF["time"] - 1

    communicators = ft.dataObject(
        foragersDF=foragerlocsDF, rewardsDF=rewardlocsDF, grid_size=35
    )

    return communicators


noncommunicators = load_communicators("sim0_run0")
communicators = load_communicators("sim1_run0")

To make sure the results make sense, we can animate the run.

[5]:
ft.animate_foragers(
    noncommunicators, plot_rewards=True, width=600, height=600, point_size=8
)
[6]:
# animate

ft.animate_foragers(
    communicators, plot_rewards=True, width=600, height=600, point_size=8
)

Optional - weak communicators simulation

[7]:
# custom list of locations for simulation setup 2
# with focus on low values of c_trust
# the main simulation run is commented out
# uncomment if you want to run many of simulations
min_value = 0.0
max_value = 0.7
density1 = 0.005
density2 = 0.01
c_locations = []

current_value = min_value
while current_value < 0.3:
    c_locations.append(current_value)
    current_value += density1
while current_value <= max_value:
    c_locations.append(current_value)
    current_value += density2

# # Simulation setup 2 for the impact of communication
home_dir_weak = os.path.join(
    repo_root, "data/foraging/communicators/communicators_weak/"
)
# agent parameters
sight_radius = [5]
c_trust = c_locations
# 0: ignorers,
N_agents = 9

# environment parameters
edge_size = 45
N_total_food_units = 16
reward_patch_dim = [1, 2, 4]  # clustered is 4, distributed is 1

# simulation parameters
N_runs = 2  # How many times would you like to run each case?
N_frames = N_frames

# Generate a dataframe containing all possible combinations of the parameter values specified above.
param_list = [i for i in product(c_trust, sight_radius, reward_patch_dim)]
metadataDF = pd.DataFrame(param_list)
metadataDF.columns = ["c_trust", "sight_radius", "reward_patch_dim"]
metadataDF["sim index"] = np.arange(len(metadataDF)).astype(int)
N_sims = len(metadataDF)

# save metadata to home directory
if dev_mode and not smoke_test:
    metadataDF.to_csv(os.path.join(home_dir_weak, "metadataDF.csv"))
    pd.DataFrame(
        [
            {
                "N_sims": N_sims,
                "N_runs": N_runs,
                "N_frames": N_frames,
                "N_agents": N_agents,
                "N_total_food_units": N_total_food_units,
                "edge_size": edge_size,
            }
        ]
    ).to_csv(os.path.join(home_dir_weak, "additional_meta_params.csv"))

display(metadataDF)

print(home_dir_weak)

# uncomment if you want to run 600 simulations
# and turn on dev_mode if you want to overwrite the csvs
# run_simulations(home_dir_weak, fresh_start = True)
c_trust sight_radius reward_patch_dim sim index
0 0.000 5 1 0
1 0.000 5 2 1
2 0.000 5 4 2
3 0.005 5 1 3
4 0.005 5 2 4
... ... ... ... ...
295 0.680 5 2 295
296 0.680 5 4 296
297 0.690 5 1 297
298 0.690 5 2 298
299 0.690 5 4 299

300 rows × 4 columns

/Users/dima/git/collab2/data/foraging/communicators/communicators_weak/