Return to Blog

Recreating the command line tool uniq in Python

By John Lekberg on March 18, 2020.


This week's post will cover recreating the command line tool uniq in Python. You will learn:

Uniq is a command line tool that removes repeated lines from text. For example, I have this data file

data.txt

a
a
a
b
b
b
c
c
a
b
b
b
b
b
c

I use uniq to remove repeated lines:

$ uniq data.txt
a
b
c
a
b
c

Uniq is specified by the POSIX standard. (See "uniq" for details.)

Recreating uniq in Python is a good way to practice using Python's standard library to build command line tools.

Script source code

uniq

#!/usr/bin/env python3
"""
A recreation of the POSIX uniq tool.
"""

import collections
import enum
import itertools
import re
import sys
import functools

UniqMode = enum.Enum(
    "UniqMode",
    [
        "normal",
        "count",
        "suppress_repeated",
        "suppress_not_repeated",
    ],
)
UniqMode.__doc__ = """
UniqMode enumerates the different modes that the script can
run in.

- UniqMode.normal is the default mode.
- UniqMode.count corresponds to the "-c" flag.
- UniqMode.suppress_repeated corresponds to the "-u" flag.
- UniqMode.suppress_not_repeated corresponds to the "-d"
  flag.
"""


def run(*, mode, input_file, output_file, fields, char):
    """Run the uniq script.
    
    mode -- the UniqMode of the script.
    input_file -- read lines from this file.
    output_file -- output information to this file.
    fields -- the number of fields to skip.
    char -- the number of characters to skip after fields.

    """
    lines = map(remove_newline, input_file)
    line_key = functools.partial(
        preprocess_line, fields=fields, char=char
    )

    for _, duplicates in itertools.groupby(lines, line_key):
        duplicates = tuple(duplicates)

        repeated = len(duplicates) > 1
        if mode is UniqMode.suppress_repeated and repeated:
            continue
        if (
            mode is UniqMode.suppress_not_repeated
            and not repeated
        ):
            continue

        if mode is UniqMode.count:
            message = f"{len(duplicates)} {duplicates[0]}"
        else:
            message = f"{duplicates[0]}"

        print(message, file=output_file)


def preprocess_line(line, *, fields, char):
    """Preprocess a line by removing initial fields and
    characters.

    fields -- the number of fields to remove.
    char -- the number of characters to remove after the
            fields.

    """
    if fields > 0:
        line = re.sub(r"\s*\S*", "", line, count=fields)
    line = line[char:]
    return line


def remove_newline(text):
    """Remove a trailing newline from text."""
    if text.endswith("\n"):
        text = text[:-1]
    return text


def positive_integer(text):
    """Parse a positive integer from text.

    Raises an Exception for bad text.
    """
    n = int(text)
    if n <= 0:
        raise ValueError()
    return n


def input_FileType(filename):
    """Like argparse.FileType("r") but the filename "-" is
    turned into sys.stdin.
    """
    if filename == "-":
        return sys.stdin
    else:
        return open(filename, "r")


if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser(
        description="report or filter out repeated lines in a file",
        prefix_chars="-+",
    )
    parser.set_defaults(mode=UniqMode.normal)
    mode_group = parser.add_mutually_exclusive_group()
    mode_group.add_argument(
        "-c",
        "+c",
        help="""
        Precede each output line with a count of the number
        of times the line occurred in the input.
        """,
        action="store_const",
        const=UniqMode.count,
        dest="mode",
    )
    mode_group.add_argument(
        "-d",
        "+d",
        help="""
        Suppress the writing of lines that are not repeated
        in the input.
        """,
        action="store_const",
        const=UniqMode.suppress_not_repeated,
        dest="mode",
    )
    mode_group.add_argument(
        "-u",
        "+u",
        help="""
        Suppress the writing of lines that are repeated in
        the input.
        """,
        action="store_const",
        const=UniqMode.suppress_repeated,
        dest="mode",
    )
    parser.add_argument(
        "-f",
        "+f",
        metavar="fields",
        type=positive_integer,
        help="""
        Ignore the first fields fields on each input line
        when doing comparisons, where fields is a positive
        decimal integer. A field is the maximal string
        matched by the basic regular expression:
        /[[:blank:]]*[^[:blank:]]*/. If the fields
        option-argument specifies more fields than appear on
        an input line, a null string shall be used for
        comparison.
        """,
        default=0,
    )
    parser.add_argument(
        "-s",
        "+s",
        metavar="char",
        type=positive_integer,
        help="""
        Ignore the first chars characters when doing
        comparisons, where chars shall be a positive decimal
        integer. If specified in conjunction with the -f
        option, the first chars characters after the first
        fields fields shall be ignored. If the chars
        option-argument specifies more characters than
        remain on an input line, a null string shall be used
        for comparison.
        """,
        default=0,
    )
    parser.add_argument(
        "input_file",
        help="""
        A pathname of the input file. If the input_file
        operand is not specified, or if the input_file is
        '-', the standard input shall be used.
        """,
        nargs="?",
        type=input_FileType,
        default=sys.stdin,
    )
    parser.add_argument(
        "output_file",
        help="""
        A pathname of the output file. If the output_file
        operand is not specified, the standard output shall
        be used. The results are unspecified if the file
        named by output_file is the file named by
        input_file.
        """,
        nargs="?",
        type=argparse.FileType("w"),
        default=sys.stdout,
    )
    args = parser.parse_args()

    with args.input_file, args.output_file:
        run(
            mode=args.mode,
            input_file=args.input_file,
            output_file=args.output_file,
            fields=args.f,
            char=args.s,
        )
$ ./uniq --help
usage: uniq [-h] [-c | -d | -u] [-f fields] [-s char]
            [input_file] [output_file]

report or filter out repeated lines in a file

positional arguments:
  ...

optional arguments:
  ...

Using the script on data

I have a sorted list of words from the Declaration of Independence:

declaration-words.txt

all
among
among
and
and
and
and
another
are
are
are
assume
bands
be
[...]
truths
unalienable
we
when
which
which
which
with
with

(Some data is omitted with "[...]" for presentation.)

I use my uniq script to count how frequently each word appears:

$ ./uniq -c declaration-words.txt
1 a
1 all
2 among
4 and
1 another
3 are
1 assume
1 bands
1 be
[...]
1 truths
1 unalienable
1 we
1 when
3 which
2 with

(Some data is omitted with "[...]" for presentation.)

How the script works

I use the argparse module to build the command line interface. (Argparse was introduced in PEP-389.)

Uniq supports 4 different ways of operating.

  1. Remove repeated lines.
  2. Remove repeated lines and print how many repetitions occurred.
  3. Remove repeated lines and only print lines that are repeated.
  4. Remove repeated lines and only print lines that are not repeated.

Only one of these 4 "modes" can be active at once, so I use an enum, UniqMode. Enums are useful for representing something that has a finite set of states. I prefer using enums to strings because Python will warn me if I mistype an enum (e.g. UniqMode.norml).

Different command line switches activate different modes: -c, -u, and -d. I use ArgumentParser.add_mutually_exclusive_group to ensure that at most, one mode switch is activated. Because these three switches all set the mode, I use keyword arguments to have [ArgumentParser.add_argument][py.argpase.ArgumentParser.add_argument] send the mode value to the same place:

I use ArgumentParser.set_defaults to set the default mode as UniqMode.normal.

Argparse also allows arguments to be processed and validated, using a function supplied in the type keyword-parameter. In this script:

argparse.FileType is like the built-in function open.

The core of this script is the itertools.groupby function. Groupby removes duplicate elements from a list, using a key function to compare elements.

In conclusion...

The argparse module allows you to build command line interfaces. Using argparse is more robust than manually parsing sys.argv. Enums can be used to represent a set of finite states (different "modes") and are more robust that using strings because Python will warn you if you misspell an enum value. itertools.groupby allows you to group repeated elements in an iterable using a key function and is the core of this uniq script.

My challenge to you:

Build a command line interface, using argparse, that simulates the command line interface of awk. (This will be a bit simpler than the interface used by uniq.)

If you enjoyed this week's post, share it with your friends and stay tuned for next week's post. See you then!


(If you spot any errors or typos on this post, contact me via my contact page.)