Return to Blog

Building a command line tool to simulate the spread of an infection

By John Lekberg on April 10, 2020.


Hacker News discussion


This week's post will cover building a command line tool to simulate the spread of an infection in a population. You will learn:

The simulation uses a Susceptible-Infected-Removed (SIR) model:

Script source code

infection

#!/usr/bin/env python3

import collections
import csv
import enum
import itertools
import math
import operator
import random
import sys

Status = enum.Enum("Status", ["S", "I", "R"])

Observation = collections.namedtuple(
    "Observation", ["day", "S", "I", "R"]
)


def simulate(
    *, N, probability, radius, duration, width, height
):
    """Simulate an infection and yield observations.
    
    Start with one infected person. Run the simulation until
    no one is infected.
    
    The world is modelled as a 2D rectangle.
    
    People move around by picking a random angle and walking
    one meter in that direction.
    
    N -- how many people to use. E.g. 1000
    probability -- the probability of being infected inside
        the danger zone.
    radius -- the danger zone around an infected person
        (meters). E.g. 3
    duration -- how long the infection lasts. An iterable of
        choices. E.g. [10, 11, 12]
    width -- the width of the world (meters).
    height -- the height of the world (meters).
    """
    status = [Status.S] * N
    timer = [0] * N
    x = [random.uniform(0, width) for _ in range(N)]
    y = [random.uniform(0, height) for _ in range(N)]

    # Infect a random person.
    status[0] = Status.I
    timer[0] = random.choice(duration)

    for day in itertools.count():
        frequency = collections.Counter(status)
        yield Observation(
            day=day,
            S=frequency[Status.S],
            I=frequency[Status.I],
            R=frequency[Status.R],
        )
        if frequency[Status.I] == 0:
            break

        # Update the infection timers.
        for n in range(N):
            if status[n] is Status.I:
                timer[n] -= 1
                if timer[n] <= 0:
                    status[n] = Status.R

        # Infect new people.
        susceptible = [
            n for n in range(N) if status[n] is Status.S
        ]
        infected = [
            n for n in range(N) if status[n] is Status.I
        ]
        for n in susceptible:
            in_infection_zone = any(
                math.hypot(x[n] - x[m], y[n] - y[m])
                <= radius
                for m in infected
            )
            if in_infection_zone:
                caught_infection = (
                    random.uniform(0, 1) < probability
                )
                if caught_infection:
                    status[n] = Status.I
                    timer[n] = random.choice(duration)

        # Move people around the world.
        for n in range(N):
            angle = random.uniform(0, 2 * math.pi)
            x[n] += math.cos(angle)
            y[n] += math.sin(angle)
            x[n] = min(max(x[n], 0), width)
            y[n] = min(max(y[n], 0), height)


if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--N",
        help="how many people to use",
        type=int,
        required=True,
        metavar="NUMBER",
    )
    parser.add_argument(
        "--probability",
        help="the probability of being infected inside the danger zone",
        type=float,
        required=True,
        metavar="P",
    )
    parser.add_argument(
        "--radius",
        help="the danger zone around an infected person.",
        type=float,
        required=True,
        metavar="METERS",
    )
    parser.add_argument(
        "--duration",
        help="how long the infection lasts.",
        type=int,
        nargs="+",
        required=True,
        metavar="DAYS",
    )
    parser.add_argument(
        "--width",
        help="the width of the world.",
        type=float,
        required=True,
        metavar="METERS",
    )
    parser.add_argument(
        "--height",
        help="the height of the world.",
        type=float,
        required=True,
        metavar="METERS",
    )
    parser.add_argument(
        "--report",
        help="report a CSV file or a summary",
        choices=["csv", "summary"],
        required=True,
    )

    args = parser.parse_args()

    simulation = simulate(
        N=args.N,
        probability=args.probability,
        radius=args.radius,
        duration=args.duration,
        width=args.width,
        height=args.height,
    )

    if args.report == "csv":
        writer = csv.DictWriter(
            sys.stdout, fieldnames=["day", "S", "I", "R"]
        )
        writer.writeheader()
        for observation in simulation:
            writer.writerow(observation._asdict())
    elif args.report == "summary":
        observations = list(simulation)
        n_days = len(observations)
        worst = max(
            observations, key=operator.attrgetter("I")
        )
        print(f"The infection took {n_days} days to end.")
        print(
            f"Day {worst.day} was the worst day:",
            f"{worst.I} people were infected at once.",
        )
$ ./infection --help
usage: infection [-h] --N NUMBER --probability P --radius METERS --duration
                 DAYS [DAYS ...] --width METERS --height METERS --report
                 {csv,summary}

optional arguments:
  -h, --help            show this help message and exit
  --N NUMBER            how many people to use
  --probability P       the probability of being infected inside the danger
                        zone
  --radius METERS       the danger zone around an infected person.
  --duration DAYS [DAYS ...]
                        how long the infection lasts.
  --width METERS        the width of the world.
  --height METERS       the height of the world.
  --report {csv,summary}
                        report a CSV file or a summary

Using the script to summarize the results of an infection

I want to understand how changing the parameters of an infection changes:

I generate a summary report for an infection using the --report summary flag:

$ ./infection --N 10000 --report summary \
    --probability 0.2 --radius 1 --duration 5 \
    --width 100 --height 100
The infection took 243 days to end.
Day 175 was the worst day: 32 people were infected at once.

I increase the radius of the original infection from 1 meter to 2 meters:

$ ./infection --N 10000 --report summary \
    --probability 0.2 --radius 2 --duration 5 \
    --width 100 --height 100
The infection took 95 days to end.
Day 46 was the worst day: 1093 people were infected at once.

Increasing the infection radius reduces the duration of the infection (243 days to 95 days) but dramatically increases the number of infected people on the worst day (32 people to 1093 people).

But what if the probability of the infection gets lower because people about better about washing their hands and not touching their faces? (I keep the infection radius at 2 meters and change the probability of infection from 20% to 5%.)

./infection --N 10000 --report summary \
    --probability 0.05 --radius 2 --duration 5 \
    --width 100 --height 100
The infection took 647 days to end.
Day 127 was the worst day: 127 people were infected at once.

Lowering the infection probability increases the duration of the infection (95 days to 647 days) but dramatically decreases the number of infected people on the worst day (1093 people to 127 people).

Using the script to generate a dataset

I generate a dataset that I can later analyze using the --report csv flag:

$ ./infection --N 100 --report csv \
    --probability 0.2 --radius 1 --duration 5 \
    --width 10 --height 10 \
    > data.csv
$ cat data.csv
day,S,I,R
0,99,1,0
1,97,3,0
2,97,3,0
3,95,5,0
4,92,8,0
5,88,11,1
6,86,11,3
7,85,12,3
8,82,13,5
9,79,13,8
10,78,10,12
11,77,9,14
12,75,10,15
13,72,10,18
14,71,8,21
15,69,9,22
16,67,10,23
17,66,9,25
18,66,6,28
19,65,6,29
20,62,7,31
21,62,5,33
22,61,5,34
23,61,5,34
24,61,4,35
25,61,1,38
26,60,2,38
27,60,1,39
28,60,1,39
29,60,1,39
30,60,1,39
31,60,0,40

Then I load this data into R for further analysis:

$ R
R version 3.5.2 (2018-12-20) -- "Eggshell Igloo"
Copyright (C) 2018 The R Foundation for Statistical Computing
Platform: x86_64-apple-darwin18.2.0 (64-bit)

...

Type 'demo()' for some demos, 'help()' for on-line help, or
'help.start()' for an HTML browser interface to help.
Type 'q()' to quit R.
> data <- read.csv("data.csv")
> summary(data)
      day              S               I               R
 Min.   : 0.00   Min.   :60.00   Min.   : 0.00   Min.   : 0.00
 1st Qu.: 7.75   1st Qu.:61.00   1st Qu.: 2.75   1st Qu.: 4.50
 Median :15.50   Median :68.00   Median : 6.00   Median :22.50
 Mean   :15.50   Mean   :72.94   Mean   : 6.25   Mean   :20.81
 3rd Qu.:23.25   3rd Qu.:82.75   3rd Qu.:10.00   3rd Qu.:34.25
 Max.   :31.00   Max.   :99.00   Max.   :13.00   Max.   :40.00

How the script works

I use an Enum object to represent the status of a person: susceptible (Status.S), infected (Status.I), or removed (Status.R).

I use a namedtuple object to represent a tally of the susceptible, infected, and removed people for a given day.

simulate is a generator function that yields Observation objects until no more people are infected.

random.uniform samples a continuous uniform distribution to randomly choose x- and y-coordinates for the people in the simulation.

itertools.count generates an infinite sequence of numbers: 0, 1, 2, 3, .... I use this to keep track of the "days" as the simulation runs.

collections.Counter creates a multiset that counts the current status of everyone in the simulation. As a result, I can check how many people are currently infected and terminate the simulation when there are no longer any infections.

I use a list of "timers" to keep track of how much longer an infected person has before the infection resolves. When someone is infected, I set the corresponding timer to the duration of the illness. During each day, I decrement the timers. When an infected person's timer reaches 0, they are no longer infected and are removed.

During each day, I check every susceptible person to see if they get infected that day. I use any to check if a susceptible person is within the infection radius of any infected person.

When someone is within an infection radius, I generate a uniform random number between 0 and 1 and compare that to the infection probability:

random.uniform(0, 1) < probability

The probability that this comparison is True is the infection probability. As a result, I use this comparison to decide if someone within an infection radius does get infected that day.

To move people around the world, I generate a random angle in radians. Then, I calculate the change in the x-coordinate using math.cos (cosine) and the change in the y-coordinate using math.sin (sine). (These calculations use the unit circle.) After updating the coordinates, I stop people from walking out of the boundaries of the world using min and max.

To generate a CSV file, I use a DictWriter object from the csv module. I convert each Observation object into a dictionary using the _asdict method. E.g.

Observation(day=12, S=321, I=94, R=100)._asdict()
OrderedDict([('day', 12), ('S', 321), ('I', 94), ('R', 100)])

To find the worst day for the summary report, I create a key function using attrgetter along with max to find the day with the maximum number of infected people.

In conclusion...

In this week's post, you learned how to create a command line tool in Python that simulates the spread of an infection the population.

My challenge to you:

In the simulation, people move one meter in a random direction each day.

If you enjoyed this week's post, share it with your friends and stay tuned for next week's post. See you then!