Building a command line tool to simulate the spread of an infection
By John Lekberg on April 10, 2020.
This week's post will cover building a command line tool to simulate the spread of an infection in a population. You will learn:
- How to simulate entities moving around an environment and infecting each other.
- How to record data and observations as the simulation is running.
The simulation uses a Susceptible-Infected-Removed (SIR) model:
- Each person has a status:
- Susceptible to the infection.
- Currently infected.
- Removed from the infected population (recovered or dead).
- Infected people can infect susceptible people.
- After a certain amount of time, infected people are removed.
Script source code
infection
#!/usr/bin/env python3
import collections
import csv
import enum
import itertools
import math
import operator
import random
import sys
Status = enum.Enum("Status", ["S", "I", "R"])
Observation = collections.namedtuple(
"Observation", ["day", "S", "I", "R"]
)
def simulate(
*, N, probability, radius, duration, width, height
):
"""Simulate an infection and yield observations.
Start with one infected person. Run the simulation until
no one is infected.
The world is modelled as a 2D rectangle.
People move around by picking a random angle and walking
one meter in that direction.
N -- how many people to use. E.g. 1000
probability -- the probability of being infected inside
the danger zone.
radius -- the danger zone around an infected person
(meters). E.g. 3
duration -- how long the infection lasts. An iterable of
choices. E.g. [10, 11, 12]
width -- the width of the world (meters).
height -- the height of the world (meters).
"""
status = [Status.S] * N
timer = [0] * N
x = [random.uniform(0, width) for _ in range(N)]
y = [random.uniform(0, height) for _ in range(N)]
# Infect a random person.
status[0] = Status.I
timer[0] = random.choice(duration)
for day in itertools.count():
frequency = collections.Counter(status)
yield Observation(
day=day,
S=frequency[Status.S],
I=frequency[Status.I],
R=frequency[Status.R],
)
if frequency[Status.I] == 0:
break
# Update the infection timers.
for n in range(N):
if status[n] is Status.I:
timer[n] -= 1
if timer[n] <= 0:
status[n] = Status.R
# Infect new people.
susceptible = [
n for n in range(N) if status[n] is Status.S
]
infected = [
n for n in range(N) if status[n] is Status.I
]
for n in susceptible:
in_infection_zone = any(
math.hypot(x[n] - x[m], y[n] - y[m])
<= radius
for m in infected
)
if in_infection_zone:
caught_infection = (
random.uniform(0, 1) < probability
)
if caught_infection:
status[n] = Status.I
timer[n] = random.choice(duration)
# Move people around the world.
for n in range(N):
angle = random.uniform(0, 2 * math.pi)
x[n] += math.cos(angle)
y[n] += math.sin(angle)
x[n] = min(max(x[n], 0), width)
y[n] = min(max(y[n], 0), height)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument(
"--N",
help="how many people to use",
type=int,
required=True,
metavar="NUMBER",
)
parser.add_argument(
"--probability",
help="the probability of being infected inside the danger zone",
type=float,
required=True,
metavar="P",
)
parser.add_argument(
"--radius",
help="the danger zone around an infected person.",
type=float,
required=True,
metavar="METERS",
)
parser.add_argument(
"--duration",
help="how long the infection lasts.",
type=int,
nargs="+",
required=True,
metavar="DAYS",
)
parser.add_argument(
"--width",
help="the width of the world.",
type=float,
required=True,
metavar="METERS",
)
parser.add_argument(
"--height",
help="the height of the world.",
type=float,
required=True,
metavar="METERS",
)
parser.add_argument(
"--report",
help="report a CSV file or a summary",
choices=["csv", "summary"],
required=True,
)
args = parser.parse_args()
simulation = simulate(
N=args.N,
probability=args.probability,
radius=args.radius,
duration=args.duration,
width=args.width,
height=args.height,
)
if args.report == "csv":
writer = csv.DictWriter(
sys.stdout, fieldnames=["day", "S", "I", "R"]
)
writer.writeheader()
for observation in simulation:
writer.writerow(observation._asdict())
elif args.report == "summary":
observations = list(simulation)
n_days = len(observations)
worst = max(
observations, key=operator.attrgetter("I")
)
print(f"The infection took {n_days} days to end.")
print(
f"Day {worst.day} was the worst day:",
f"{worst.I} people were infected at once.",
)
$ ./infection --help
usage: infection [-h] --N NUMBER --probability P --radius METERS --duration
DAYS [DAYS ...] --width METERS --height METERS --report
{csv,summary}
optional arguments:
-h, --help show this help message and exit
--N NUMBER how many people to use
--probability P the probability of being infected inside the danger
zone
--radius METERS the danger zone around an infected person.
--duration DAYS [DAYS ...]
how long the infection lasts.
--width METERS the width of the world.
--height METERS the height of the world.
--report {csv,summary}
report a CSV file or a summary
Using the script to summarize the results of an infection
I want to understand how changing the parameters of an infection changes:
- How long the infection lasts.
- How many people were sick on the worst day.
I generate a summary report for an infection using the --report summary
flag:
$ ./infection --N 10000 --report summary \ --probability 0.2 --radius 1 --duration 5 \ --width 100 --height 100
The infection took 243 days to end.
Day 175 was the worst day: 32 people were infected at once.
I increase the radius of the original infection from 1 meter to 2 meters:
$ ./infection --N 10000 --report summary \ --probability 0.2 --radius 2 --duration 5 \ --width 100 --height 100
The infection took 95 days to end.
Day 46 was the worst day: 1093 people were infected at once.
Increasing the infection radius reduces the duration of the infection (243 days to 95 days) but dramatically increases the number of infected people on the worst day (32 people to 1093 people).
But what if the probability of the infection gets lower because people about better about washing their hands and not touching their faces? (I keep the infection radius at 2 meters and change the probability of infection from 20% to 5%.)
./infection --N 10000 --report summary \ --probability 0.05 --radius 2 --duration 5 \ --width 100 --height 100
The infection took 647 days to end.
Day 127 was the worst day: 127 people were infected at once.
Lowering the infection probability increases the duration of the infection (95 days to 647 days) but dramatically decreases the number of infected people on the worst day (1093 people to 127 people).
Using the script to generate a dataset
I generate a dataset that I can later analyze using the --report csv
flag:
$ ./infection --N 100 --report csv \ --probability 0.2 --radius 1 --duration 5 \ --width 10 --height 10 \ > data.csv
$ cat data.csv
day,S,I,R
0,99,1,0
1,97,3,0
2,97,3,0
3,95,5,0
4,92,8,0
5,88,11,1
6,86,11,3
7,85,12,3
8,82,13,5
9,79,13,8
10,78,10,12
11,77,9,14
12,75,10,15
13,72,10,18
14,71,8,21
15,69,9,22
16,67,10,23
17,66,9,25
18,66,6,28
19,65,6,29
20,62,7,31
21,62,5,33
22,61,5,34
23,61,5,34
24,61,4,35
25,61,1,38
26,60,2,38
27,60,1,39
28,60,1,39
29,60,1,39
30,60,1,39
31,60,0,40
Then I load this data into R for further analysis:
$ R
R version 3.5.2 (2018-12-20) -- "Eggshell Igloo"
Copyright (C) 2018 The R Foundation for Statistical Computing
Platform: x86_64-apple-darwin18.2.0 (64-bit)
...
Type 'demo()' for some demos, 'help()' for on-line help, or
'help.start()' for an HTML browser interface to help.
Type 'q()' to quit R.
> data <- read.csv("data.csv")
> summary(data)
day S I R
Min. : 0.00 Min. :60.00 Min. : 0.00 Min. : 0.00
1st Qu.: 7.75 1st Qu.:61.00 1st Qu.: 2.75 1st Qu.: 4.50
Median :15.50 Median :68.00 Median : 6.00 Median :22.50
Mean :15.50 Mean :72.94 Mean : 6.25 Mean :20.81
3rd Qu.:23.25 3rd Qu.:82.75 3rd Qu.:10.00 3rd Qu.:34.25
Max. :31.00 Max. :99.00 Max. :13.00 Max. :40.00
How the script works
I use an Enum object to represent the status of a person:
susceptible (Status.S
), infected (Status.I
), or removed (Status.R
).
I use a namedtuple object to represent a tally of the susceptible, infected, and removed people for a given day.
simulate
is a generator function that yields
Observation
objects until no more people are infected.
random.uniform samples a continuous uniform distribution to randomly choose x- and y-coordinates for the people in the simulation.
itertools.count generates an infinite sequence of numbers: 0, 1, 2, 3, .... I use this to keep track of the "days" as the simulation runs.
collections.Counter creates a multiset that counts the current status of everyone in the simulation. As a result, I can check how many people are currently infected and terminate the simulation when there are no longer any infections.
I use a list of "timers" to keep track of how much longer an infected person has before the infection resolves. When someone is infected, I set the corresponding timer to the duration of the illness. During each day, I decrement the timers. When an infected person's timer reaches 0, they are no longer infected and are removed.
During each day, I check every susceptible person to see if they get infected that day. I use any to check if a susceptible person is within the infection radius of any infected person.
When someone is within an infection radius, I generate a uniform random number between 0 and 1 and compare that to the infection probability:
random.uniform(0, 1) < probability
The probability that this comparison is True is the infection probability. As a result, I use this comparison to decide if someone within an infection radius does get infected that day.
To move people around the world, I generate a random angle in radians. Then, I calculate the change in the x-coordinate using math.cos (cosine) and the change in the y-coordinate using math.sin (sine). (These calculations use the unit circle.) After updating the coordinates, I stop people from walking out of the boundaries of the world using min and max.
To generate a CSV file, I use a
DictWriter object from the csv module.
I convert each Observation
object into a
dictionary using the
_asdict method. E.g.
Observation(day=12, S=321, I=94, R=100)._asdict()
OrderedDict([('day', 12), ('S', 321), ('I', 94), ('R', 100)])
To find the worst day for the summary report, I create a key function using attrgetter along with max to find the day with the maximum number of infected people.
In conclusion...
In this week's post, you learned how to create a command line tool in Python that simulates the spread of an infection the population.
My challenge to you:
In the simulation, people move one meter in a random direction each day.
- Change this so that they move a random distance (e.g. between 1 and 3 meters) each day. How does this affect the spread of the infection?
- Change this so that infected people move slower (e.g. 0.5 meters) than non-infected people. How does this affect the spread of the infection?
If you enjoyed this week's post, share it with your friends and stay tuned for next week's post. See you then!
(If you spot any errors or typos on this post, contact me via my contact page.)