Dec 05, 2020

Query SVN Using Python Async and Await

I've been a fan of Python's asynchronous libraries for a while now. I realized I could significantly speed up a script I wrote that used subprocess to call svn by making those calls asynchronous.

The Problem

Valley of Fire, Nevada Landscape

We use Subversion at work, and it's fine. We have a pretty large mono-repo. While conversing with one of my colleagues, we started wondering who wrote more lines in the subsection of the code base that we work on. I had been there nine years to his five, but hadn't spent much time in that area of the code base in a couple of years. He had been working exclusively in that area since he started.

I know this isn't important, but it piqued my curiosity nonetheless. And I'll take any opportunity I can find to write some Python. I'm sure I could have done this in Powershell, but I know Python much better, so I started there.

The Synchronous Version

My basic plan was as follows:

  1. Build up a list of files using the glob module with a list of patterns.
  2. Create an object for each of those files to encapsulate running svn blame and parsing the output.
  3. Run blame using subprocess.run.
  4. Combine the results together into a dictionary keyed by author.
  5. Print the author and the total number of lines.

contributors.py

"""
Get the line counts from svn for a set of files and combine them into
an overall line count per author.
"""

import collections
import glob
import itertools
from pathlib import Path
import subprocess
from typing import Dict, List
import xml.etree.ElementTree as ET


class FailedCommandException(Exception):
    """Exception indicating a subprocess command returned a non-zero exit code."""
    args: List
    stdout: bytes
    stderr: bytes

    def __init__(self, args: List, stdout: bytes, stderr: bytes):
        self.args = args
        self.stdout = stdout
        self.stderr = stderr
        super().__init__(f"Failed to run command {args}")


class ContributorCount(collections.defaultdict):
    """Count the lines in a file authored by each person.

    The dictionary stores a list of line data keyed by author.
    The line data includes the line number and the date it was modified.

    """

    file_path: Path

    def __init__(self, file_path: Path):
        self.file_path = file_path
        print(self.file_path)
        super().__init__(list)

    def count(self):
        """Run svn blame. Then organize the results by author."""

        result = subprocess.run(
            ['svn', 'blame', '--xml', self.file_path],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE
        )

        if result.returncode != 0:
            raise FailedCommandException(
                result.args, result.stdout, result.stderr
            )

        blame = ET.fromstring(result.stdout)

        for entry in blame.findall('./target/entry'):
            data = entry.attrib
            commit = entry.find('commit')
            data.update(commit.attrib)
            author = commit.find('author').text
            self[author.lower()].append(data)

    def __str__(self) -> str:
        result = [str(self.file_path)]
        result += [f"{name} {len(lines)}" for name, lines in self.items()]
        return '\n'.join(result)

    @classmethod
    def get_count(cls, file_path: Path) -> 'ContributorCount':
        """Helper method to create the object and count it."""
        count = cls(file_path)
        count.count()
        return count


def add_counts(
    combined_counts: collections.defaultdict, file_path: str
):
    """Get the counts for the file and add them to combined counts."""
    try:
        result = ContributorCount.get_count(file_path)
    except FailedCommandException as err:
        print(err)
    else:
        for name, lines in result.items():
            combined_counts[name][result.file_path] = lines


def main(*patterns: List[str]):
    """Get line counts per author for most of the interface files."""
    # Build a list of all the files to look at
    file_paths = itertools.chain.from_iterable(glob.iglob(p) for p in patterns)

    combined_counts: Dict[str, Dict[str, List[Dict[str, str]]]] = collections.defaultdict(dict)

    # Combine the counts for each file
    for path in file_paths:
        add_counts(combined_counts, path)

    # Further combine combined_counts to return an overall line count per author
    overall_counts = [
        (name, sum(len(lines) for lines in files.values()))
        for name, files
        in combined_counts.items()
    ]

    # Sort the line counts descending and print them
    for name, count in sorted(overall_counts, key=lambda x: x[1], reverse=True):
        print(name, count)


if __name__ == "__main__":
    main(
        'file1',
        'file2',
        'file3',
    )

This worked, but there was a problem. It took over two minutes to run for a list of 116 files. This was unacceptable for me, because I needed to check even more files. All the time was spent waiting for responses from the subversion server. So how do you speed things up when you spend most of your time waiting for a response?

Async to the Rescue

If you aren't familiar with Python's concurrency model (which is similar to many other languages), it's an easy-to-use abstraction that allows your code to do other things while it would otherwise be waiting on something. So, in my case, I wanted to make several blame calls concurrently and then process the incoming results as they were available.

I've worked with asyncio quite a bit, but I've done it almost exclusively with web requests and aiohttp. So I wasn't quite sure how to wrap a process call to make it asynchronous. Thankfully, I found this blog post from Fredrik Averpil with some code I adapted.

async def run_command(*args) -> str:
    """Run command asynchronously in subprocess.

    Return contents of stdout on success.
    Raise a FailedCommandException if the command fails.

    Example from:
        http://asyncio.readthedocs.io/en/latest/subprocess.html
    """
    # Create subprocess
    process = await asyncio.create_subprocess_exec(
        *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
    )

    # Wait for the subprocess to finish
    stdout, stderr = await process.communicate()

    if process.returncode != 0:
        raise FailedCommandException(args, stdout, stderr)

    return stdout.decode().strip()

That was pretty simple! Now that this code was asynchronous, I needed to update the calls to await the response. So I needed to update ContributorCount.count.

async def count(self):
    """Run svn blame asynchronously. Then organize the results by author."""
    result = await run_command('svn', 'blame', '--xml', self.file_path)

    blame = ET.fromstring(result)

    for entry in blame.findall('./target/entry'):
        data = entry.attrib
        commit = entry.find('commit')
        data.update(commit.attrib)
        author = commit.find('author').text
        self[author.lower()].append(data)

And get_count and add_counts, too.

In order to actually make things concurrent, I needed to update main to tell Python to kick off the asynchronous tasks together. In Python, you can do this with asyncio.gather You can pass a list of tasks to this function. When Python is waiting on a response from the blame call, it will continue kicking off tasks and waiting until all have returned.

async def main(*patterns: List[str]):
    """Get line counts per author for most of the interface files."""
    # Build a list of all the files to look at
    file_paths = itertools.chain.from_iterable(glob.iglob(p) for p in patterns)

    combined_counts: Dict[str, Dict[str, List[Dict[str, str]]]] = collections.defaultdict(dict)

    # Combine the counts for each file
    await asyncio.gather(
        *map(lambda x: add_counts(combined_counts, x), file_paths)
    )

    # Further combine combined_counts to return an overall line count per author
    overall_counts = [
        (name, sum(len(lines) for lines in files.values()))
        for name, files in combined_counts.items()
    ]

    # Sort the line counts descending and print them
    for name, count in sorted(overall_counts, key=lambda x: x[1], reverse=True):
        print(name, count)

Finally, I needed to tell Python to run my async code on an event loop, which has been really easy to do since Python 3.7 with asyncio.run.

if __name__ == "__main__":
    asyncio.run(
        main(
            'file1',
            'file2',
            'file3',
        )
    )

With those changes, I brought the runtime down to 55 seconds for the same set of 116 files. I was pretty happy with that improvement, but there was still a problem.

Rate Limiting

When you convert synchronous code to concurrent code, it's easy to overlook rate limiting. I was watching Process Explorer when I ran the asynchronous version, so it was easy to see the 116 svn processes spring up. As mentioned above, I do work with other people, so even though my machine handled all those processes, making 116 requests to the shared subversion server could be construed as rude. So I needed to limit the number of simultaneous requests.

Semaphores make that really easy in Python. A semaphore is basically a counter with a maximum value. When you acquire the semaphore, it decrements, and when you release it, it increments. If the value is zero, it can't be acquired until the value goes up. And to make things super easy, you can use the semaphore as a context manager. So I added a semaphore as a class attribute to ContributorCount to control access to the blame call.

class ContributorCount(collections.defaultdict):
    """Count the lines in a file authored by each person.

    The dictionary stores a list of line data keyed by author.
    The line data includes the line number and the date it was modified.

    """

    # Wait and initialize the Semaphore the first time it is needed so we can
    # be sure to use the same event loop.
    lock: Semaphore = None

    ...

    async def count(self):
        """Run svn blame asynchronously. Then organize the results by author."""
        async with self.lock:
            result = await run_command('svn', 'blame', '--xml', self.file_path)

        blame = ET.fromstring(result)

        for entry in blame.findall('./target/entry'):
            data = entry.attrib
            commit = entry.find('commit')
            data.update(commit.attrib)
            author = commit.find('author').text
            self[author.lower()].append(data)

How long did it take with the semaphore? About one second longer. How is this possible? The Subversion server is only able to process so many requests at once. Whether I submit them all at once or spread them out, it doesn't make much of a difference.

Conclusion

As usual with things like this, I spent more time than I had planned to. But I learned a lot, like how to handle subprocesses asynchronously and how to parse XML data. In the end, I blamed 243 files in 88 seconds. Doubling the number of files in well under the original time of 131 seconds isn't bad, if I do say so myself.

Here's the final version of the file in case you want to do something similar.

contributors.py

"""
Get the line counts from svn for a set of files and combine them into
an overall line count per author.
"""

import asyncio
from asyncio.locks import Semaphore
import collections
import glob
import itertools
from pathlib import Path
from typing import Dict, List
import xml.etree.ElementTree as ET


class FailedCommandException(Exception):
    """Exception indicating a subprocess command returned a non-zero exit code."""
    args: List
    stdout: bytes
    stderr: bytes

    def __init__(self, args: List, stdout: bytes, stderr: bytes):
        self.args = args
        self.stdout = stdout
        self.stderr = stderr
        super().__init__(f"Failed to run command {args}")


async def run_command(*args) -> str:
    """Run command asynchronously in subprocess.

    Return contents of stdout on success.
    Raise a FailedCommandException if the command fails.

    Example from:
        http://asyncio.readthedocs.io/en/latest/subprocess.html
    """
    # Create subprocess
    process = await asyncio.create_subprocess_exec(
        *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
    )

    # Wait for the subprocess to finish
    stdout, stderr = await process.communicate()

    if process.returncode != 0:
        raise FailedCommandException(args, stdout, stderr)

    return stdout.decode().strip()


class ContributorCount(collections.defaultdict):
    """Count the lines in a file authored by each person.

    The dictionary stores a list of line data keyed by author.
    The line data includes the line number and the date it was modified.

    """

    file_path: Path

    # Wait and initialize the Semaphore the first time it is needed so we can
    # be sure to use the same event loop.
    lock: Semaphore = None

    def __init__(self, file_path: Path):
        self.file_path = file_path
        super().__init__(list)
        if self.__class__.lock is None:
            self.__class__.lock = Semaphore(10)

    async def count(self):
        """Run svn blame asynchronously. Then organize the results by author."""
        async with self.lock:
            result = await run_command('svn', 'blame', '--xml', self.file_path)

        blame = ET.fromstring(result)

        for entry in blame.findall('./target/entry'):
            data = entry.attrib
            commit = entry.find('commit')
            data.update(commit.attrib)
            author = commit.find('author').text
            self[author.lower()].append(data)

    def __str__(self) -> str:
        result = [str(self.file_path)]
        result += [f"{name} {len(lines)}" for name, lines in self.items()]
        return '\n'.join(result)

    @classmethod
    async def get_count(cls, file_path: Path) -> 'ContributorCount':
        """Helper method to create the object and count it."""
        count = cls(file_path)
        await count.count()
        return count


async def add_counts(
    combined_counts: collections.defaultdict, file_path: str
):
    """Get the counts for the file and add them to combined counts."""
    try:
        result = await ContributorCount.get_count(file_path)
    except FailedCommandException as err:
        print(err)
    else:
        for name, lines in result.items():
            combined_counts[name][result.file_path] = lines


async def main(*patterns: List[str]):
    """Get line counts per author for most of the interface files."""
    # Build a list of all the files to look at
    file_paths = itertools.chain.from_iterable(glob.iglob(p) for p in patterns)

    combined_counts: Dict[str, Dict[str, List[Dict[str, str]]]] = collections.defaultdict(dict)

    # Combine the counts for each file
    await asyncio.gather(
        *map(lambda x: add_counts(combined_counts, x), file_paths)
    )

    # Further combine combined_counts to return an overall line count per author
    overall_counts = [
        (name, sum(len(lines) for lines in files.values()))
        for name, files in combined_counts.items()
    ]

    # Sort the line counts descending and print them
    for name, count in sorted(overall_counts, key=lambda x: x[1], reverse=True):
        print(name, count)


if __name__ == "__main__":
    asyncio.run(
        main(
            'file1',
            'file2',
            'file3',
        )
    )