GrabDuck

Interacting with a long-running child process in Python - Eli Bendersky's website

:

Tags Python

The Python subprocess module is a powerful swiss-army knife for launching and interacting with child processes. It comes with several high-level APIs like call, check_output and (starting with Python 3.5) run that are focused at child processes our program runs and waits to complete.

In this post I want to discuss a variation of this task that is less directly addressed - long-running child processes. Think about testing some server - for example an HTTP server. We launch it as a child process, then connect clients to it and run some testing sequence. When we're done we want to shut down the child process in an orderly way. This would be difficult to achieve with APIs that just run a child process to completion synchronously, so we'll have to look at some of the lower-level APIs.

Sure, we could launch a child process with subprocess.run in one thread and interact with it (via a known port, for example) in another thread. But this would make it tricky to cleanly terminate the child process when we're done with it. If the child process has an orderly termination sequence (such as sending some sort of "quit" command), this is doable. But most servers do not, and will just spin forever until killed. This is the use-case this post addresses.

Launch, interact, terminate and get all output when done

The first, simplest use case will be launching an HTTP server, interacting with it, terminating it cleanly and getting all the server's stdout and stderr when done. Here are the important bits of the code (all full code samples for this post are available here), tested with Python 3.6:

def main():
    proc = subprocess.Popen(['python3', '-u', '-m', 'http.server', '8070'],
                            stdout=subprocess.PIPE,
                            stderr=subprocess.STDOUT)

    try:
        time.sleep(0.2)
        resp = urllib.request.urlopen('http://localhost:8070')
        assert b'Directory listing' in resp.read()
    finally:
        proc.terminate()
        try:
            outs, _ = proc.communicate(timeout=0.2)
            print('== subprocess exited with rc =', proc.returncode)
            print(outs.decode('utf-8'))
        except subprocess.TimeoutExpired:
            print('subprocess did not terminate in time')

The child process is an HTTP server using Python's own http.server module, serving contents from the directory it was launched in. We use the low-level Popen API to launch the process asynchronously (meaning that Popen returns immediately and the child process runs in the background).

Note the -u passed to Python on invocation: this is critical to avoid stdout buffering and seeing as much of stdout as possible when the process is killed. Buffering is a serious issue when interacting with child processes, and we'll see more examples of this later on.

The meat of the sample happens in the finally block. proc.terminate() sends the child process a SIGTERM signal. Then, proc.communicate waits for the child to exit and captures all of its stdout. communicate has a very convenient timeout argument starting with Python 3.3 [1], letting us know if the child does not exit for some reason. A more sophisticated technique could be to send the child a SIGKILL (with proc.kill) if it didn't exit due to SIGTERM.

If you run this script, you'll see the output:

$ python3.6 interact-http-server.py
== subprocess exited with rc = -15
Serving HTTP on 0.0.0.0 port 8070 (http://0.0.0.0:8070/) ...
127.0.0.1 - - [05/Jul/2017 05:48:34] "GET / HTTP/1.1" 200 -

The return code of the child is -15 (negative means terminated by a signal, 15 is the numeric code for SIGTERM). The stdout was properly captured and printed out.

Launch, interact, get output in real time, terminate

A related use case is getting the stdout of a child process in "real-time" and not everything together at the end. Here we have to be really careful about buffering, because it can easily bite and deadlock the program. Linux processes are usually line-buffered in interactive mode and fully buffered otherwise. Very few processes are fully unbuffered. Therefore, reading stdout in chunks of less than a line is not recommended, in my opinion. Really, just don't do it. Standard I/O is meant to be used in a line-wise way (think of how all the Unix command-line tools work); if you need sub-line granularity, stdout is not the way to go (use a socket or something).

Anyway, to our example:

def output_reader(proc):
    for line in iter(proc.stdout.readline, b''):
        print('got line: {0}'.format(line.decode('utf-8')), end='')


def main():
    proc = subprocess.Popen(['python3', '-u', '-m', 'http.server', '8070'],
                            stdout=subprocess.PIPE,
                            stderr=subprocess.STDOUT)

    t = threading.Thread(target=output_reader, args=(proc,))
    t.start()

    try:
        time.sleep(0.2)
        for i in range(4):
            resp = urllib.request.urlopen('http://localhost:8070')
            assert b'Directory listing' in resp.read()
            time.sleep(0.1)
    finally:
        proc.terminate()
        try:
            proc.wait(timeout=0.2)
            print('== subprocess exited with rc =', proc.returncode)
        except subprocess.TimeoutExpired:
            print('subprocess did not terminate in time')
    t.join()

The sample is similar except for how stdout is handled; there's no more calls to communicate; instead, proc.wait just waits for the child to exit (after SIGTERM has been sent). A thread polls the child's stdout attribute, looping as long as new lines are available and printing them immediately. If you run this sample, you'll notice that the child's stdout is reported in real-time, rather than as one lump at the end.

The iter(proc.stdout.readline, b'') snippet is continously calling proc.stdout.readline(), until this call returns an empty bytestring. This only happens when proc.stdout is closed, which occurs when the child exits. Thus, while it may seem like the reader thread might never terminate - it always will! As long as the child process is running, the thread will dutifully block on that readline; as soon as the child terminates, the readline call returns b'' and the thread exits.

If we don't want to just print the captured stdout, but rather do something with it (such as look for expected patterns), this is easy to organize with Python's thread-safe queue. The reader thread becomes:

def output_reader(proc, outq):
    for line in iter(proc.stdout.readline, b''):
        outq.put(line.decode('utf-8'))

And we launch it with:

outq = queue.Queue()
t = threading.Thread(target=output_reader, args=(proc, outq))
t.start()

Then at any point we can check if there's stuff in the queue by using its non-blocking mode (the full code sample is here):

try:
    line = outq.get(block=False)
    print('got line from outq: {0}'.format(line), end='')
except queue.Empty:
    print('could not get line from queue')

Direct interaction with the child's stdin and stdout

This sample is getting into dangerous waters; the subprocess module documentation warns against doing the things described here due to possible deadlocks, but sometimes there's simply no choice! Some programs like using their stdin and stdout for interaction. Alternatively, you may have a program with an interactive (interpreter) mode you'd like to test - like the Python interepreter itself. Sometimes it's OK to feed this program all its input at once and then check its output; this can, and should be done with communicate - the perfect API for this purpose. It properly feeds stdin, closes it when done (which signals many interactive programs that game's over), etc. But what if we really want to provide additional input based on some previous output of the child process. Here goes:

def main():
    proc = subprocess.Popen(['python3', '-i'],
                            stdin=subprocess.PIPE,
                            stdout=subprocess.PIPE,
                            stderr=subprocess.PIPE)

    # To avoid deadlocks: careful to: add \n to output, flush output, use
    # readline() rather than read()
    proc.stdin.write(b'2+2\n')
    proc.stdin.flush()
    print(proc.stdout.readline())

    proc.stdin.write(b'len("foobar")\n')
    proc.stdin.flush()
    print(proc.stdout.readline())

    proc.stdin.close()
    proc.terminate()
    proc.wait(timeout=0.2)

Let me reiterate what the comment in this code sample is saying:

  • When sending input to a line interpreter, don't forget to send the actual newline.
  • Always flush the stream after placing data into it, since it may be buffered.
  • Use readline to get input from the line interpreter.

We have to be very careful to avoid the following situation:

  1. We send data to the child's stdin, but it doesn't get the complete input for some reason (lack of newline, buffering etc.)
  2. We then invoke readline to wait for the reply.

Since the child is still waiting for input to complete (step 1), our step 2 may hang forever. This is a classic deadlock.

In the end of the interaction, we close the child's stdin (this is optional but useful for some kinds of child processes) call terminate and then wait. It would be better to send the child process some sort of "exit" command (quit() in the case of the Python interpreter); the terminate here is to demonstrate what we have to do if the other options are unavailable. Note that we could also use communicate here instead of wait to capture the stderr output.

Interact using non-blocking reads and stoppable threads

The final sample demonstrates a slighly more advanced scenario. Suppose we're testing a long-lived socket server, and we're interested in orchestrating complex interactions with it, perhaps with multiple concurrent clients. We'll also want a clean shut-down of the whole setup of threads and child processes. The full code sample is here; what follows is a couple of representative snippets. The key ingredient is this socket reading function, meant to be run in its own thread:

def socket_reader(sockobj, outq, exit_event):
    while not exit_event.is_set():
        try:
            buf = sockobj.recv(1)
            if len(buf) < 1:
                break
            outq.put(buf)
        except socket.timeout:
            continue
        except OSError as e:
            break

Best used with a socket that has a timeout set on it, this function will repeatedly monitor the socket for new data and push everything it receives [2] into outq, which is a queue.Queue. The function exits when either the socket is closed (recv returns an empty bytestring), or when exit_event (a threading.Event) is set by the caller.

The caller can launch this function in a thread and occasionally try to read new items from the queue in a non-blocking way:

try:
    v = outq.get(block=False)
    print(v)
except queue.Empty:
    break

When all is done, the caller can set the exit Event to stop the thread (the thread will stop on its own if the socket it's reading from is closed, but the event lets us control this more directly).

Final words

There's no single fits-all solution for the task described in this post; I presented a bunch of recipes to handle the more commonly occurring situations, but it may be the case that specific use cases may not be addressed by them. Please let me know if you run into an interesting use case these recipes helped (or did not help!) resolve. Any other feedback is also welcome, as usual.