Return to Blog

Building a command line tool to manipulate CSV files

By John Lekberg on September 26, 2020.


In this week's post, you will learn how to build a command line tool that manipulates CSV files.

Script source code

csv-proc

#!/usr/bin/env python3

from collections import OrderedDict
import csv
import logging
import sys

logging.basicConfig(format="%(levelname)s: %(message)s")


def MAIN():
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "file", help="CSV file. E.g. 'data.csv'."
    )
    parser.add_argument(
        "command",
        help="Python code to run on each row of the file.",
    )
    parser.add_argument(
        "--init",
        default="",
        metavar="CODE",
        help="Python code to run before processing begins.",
    )
    args = parser.parse_args()

    code_command = compile_or_exit(args.command)
    code_init = compile_or_exit(args.init)

    env_global = {}
    exec(code_init, env_global)

    with open(args.file, "rt") as file:
        dialect = guess_dialect(file)
        reader = csv.DictReader(file, dialect=dialect)

        results = []
        for row in reader:
            env_global["row"] = RowProxy(row)
            env_local = OrderedDict()
            exec(code_command, env_global, env_local)
            result = row.copy()
            result.update(
                (k, v)
                for k, v in env_local.items()
                if not k.startswith("_")
            )
            results.append(result)

    fieldnames = tuple(results[0].keys())
    writer = csv.DictWriter(
        sys.stdout, fieldnames=fieldnames, dialect=dialect
    )
    writer.writeheader()
    for row in results:
        writer.writerow(row)


def compile_or_exit(text):
    """Return a compiled statement or call sys.exit on failure."""
    try:
        return compile(text, "<string>", "exec")
    except SyntaxError as error:
        logging.error(f"Unable to parse {text!r}")
        sys.exit(1)


def guess_dialect(file):
    """Guess the CSV dialect of a file by examining a sample."""
    sample = file.read(2048)
    file.seek(0)
    dialect = csv.Sniffer().sniff(sample)
    return dialect


class RowProxy:
    """Read-only proxy to dict object.

    Allows element access like `d["x"]` and `d.x`.
    """

    def __init__(self, row):
        self._row = row

    def __getitem__(self, key):
        return self._row[key]

    def __getattr__(self, key):
        return self._row[key]


if __name__ == "__main__":
    MAIN()
$ csv-proc --help
usage: csv-proc [-h] [--init CODE] file command

positional arguments:
  file         CSV file. E.g. 'data.csv'.
  command      Python code to run on each row of the file.

optional arguments:
  -h, --help   show this help message and exit
  --init CODE  Python code to run before processing begins.

Using the script to classify BMI and blood glucose

In this example, I have medical data on some hospital patients:

data.csv

id,height_ft,weight_lb,glucose_mgdL
PT-9357,5.96,189,124
PT-5315,5.53,180,106
PT-7733,6.15,222,126
PT-9667,6.10,174,104
PT-8893,6.00,232,98
PT-1125,5.97,187,85
PT-1222,5.82,237,104
PT-7681,6.00,198,134
PT-7043,5.98,227,78
PT-2623,5.55,204,125
PT-7031,5.50,216,107
PT-4947,5.91,199,105
PT-9711,6.13,237,107
PT-3737,6.03,179,111
PT-2931,5.78,187,87

Here's what the columns mean:

From this data, I want to make several calculations:

Here's how I can use csv-proc to accomplish this:

$ ./csv-proc data.csv '
_height_m = float(row.height_ft) * 0.3048
_weight_kg = float(row.weight_lb) * 0.453592
bmi = _weight_kg / _height_m ** 2
bmi = round(bmi, 1)
if bmi < 18.5:
    bmi_cat = "underweight"
elif 18.5 <= bmi < 25:
    bmi_cat = "normal"
elif 25 <= bmi < 30:
    bmi_cat = "overweight"
elif 30 <= bmi:
    bmi_cat = "obese"

_glucose_mgdL = float(row.glucose_mgdL)
if _glucose_mgdL < 100:
    glucose_cat = "normal"
elif 100 <= _glucose_mgdL < 126:
    glucose_cat = "prediabetic"
elif 126 <= _glucose_mgdL:
    glucose_cat = "diabetic"
'
id,height_ft,weight_lb,glucose_mgdL,bmi,bmi_cat,glucose_cat
PT-9357,5.96,189,124,26.0,overweight,prediabetic
PT-5315,5.53,180,106,28.7,overweight,prediabetic
PT-7733,6.15,222,126,28.7,overweight,diabetic
PT-9667,6.10,174,104,22.8,normal,prediabetic
PT-8893,6.00,232,98,31.5,obese,normal
PT-1125,5.97,187,85,25.6,overweight,normal
PT-1222,5.82,237,104,34.2,obese,prediabetic
PT-7681,6.00,198,134,26.9,overweight,diabetic
PT-7043,5.98,227,78,31.0,obese,normal
PT-2623,5.55,204,125,32.3,obese,prediabetic
PT-7031,5.50,216,107,34.9,obese,prediabetic
PT-4947,5.91,199,105,27.8,overweight,prediabetic
PT-9711,6.13,237,107,30.8,obese,prediabetic
PT-3737,6.03,179,111,24.0,normal,prediabetic
PT-2931,5.78,187,87,27.3,overweight,normal

(This uses the functions float and round.)

I could also accomplish this by defining some helper functions using the --init flag:

$ ./csv-proc data.csv '

_height_m = N(row.height_ft) * 0.3048
_weight_kg = N(row.weight_lb) * 0.453592
bmi = round(_weight_kg / _height_m ** 2, 1)
bmi_cat = classify_bmi(bmi)
_glucose_mgdL = N(row.glucose_mgdL)
glucose_cat = classify_glucose(_glucose_mgdL)

' --init '

from bisect import bisect_right
from functools import partial

N = float

def classify(x, *, cutoff, label):
    return label[bisect_right(cutoff, x)]

classify_bmi = partial(
    classify,
    cutoff=[18.5, 25, 30],
    label="underweight normal overweight obese".split(),
)

classify_glucose = partial(
    classify,
    cutoff=[100, 126],
    label="normal prediabetic diabetic".split(),
)

'
id,height_ft,weight_lb,glucose_mgdL,bmi,bmi_cat,glucose_cat
PT-9357,5.96,189,124,26.0,overweight,prediabetic
PT-5315,5.53,180,106,28.7,overweight,prediabetic
PT-7733,6.15,222,126,28.7,overweight,diabetic
PT-9667,6.10,174,104,22.8,normal,prediabetic
PT-8893,6.00,232,98,31.5,obese,normal
PT-1125,5.97,187,85,25.6,overweight,normal
PT-1222,5.82,237,104,34.2,obese,prediabetic
PT-7681,6.00,198,134,26.9,overweight,diabetic
PT-7043,5.98,227,78,31.0,obese,normal
PT-2623,5.55,204,125,32.3,obese,prediabetic
PT-7031,5.50,216,107,34.9,obese,prediabetic
PT-4947,5.91,199,105,27.8,overweight,prediabetic
PT-9711,6.13,237,107,30.8,obese,prediabetic
PT-3737,6.03,179,111,24.0,normal,prediabetic
PT-2931,5.78,187,87,27.3,overweight,normal

(This uses functions float, functools.partial, and bisect.bisect_right.)

How to script works

I use the csv module to read and write CSV files. The method csv.Sniffer.sniff analyzes a sample of a CSV file and guesses the "dialect" (delimiter, quoting, etc.). I use csv.DictReader and csv.DictWriter instead of csv.reader and csv.writer because I want to expose the rows as dictionaries instead

I use compile to prepare text to be executed as code by exec. To determine the new columns to add to the output, I examine how collections.OrderedDict env_local changes after processing a row. I ignore local variables that start with _ because I wanted an easy way to create "temporary variables" that won't show up in the output.

When I process each row of the input CSV file, I create a RowProxy object that allows accessing dictionary entries as attributes. E.g.

rp = RowProxy({"cat": 3})
rp["cat"]
3
rp.cat
3

In conclusion...

In this week's post you learned how to a command line tool that manipulates CSV files. You learned how to use the csv module to read and write CSV files, as well as how to use csv.Sniffer to automatically deduce the "dialect" of a CSV file. You also learned how to use compile and exec to run text as Python code.

My challenge to you:

Build a tool like csv-proc that allows you to filter CSV files based on some criteria. For example, if you only want to view prediabetic patients from data.csv, you could call your program like this:

$ csv-filter data.csv '100 <= float(row.glucose_mgdL) < 126'

If you enjoyed this week's post, share it with your friends and stay tuned for next week's post. See you then!


(If you spot any errors or typos on this post, contact me via my contact page.)