art with code

2008-10-05

I/O in programming languages: piping to processes

This post is a part of a series where your intrepid host looks at I/O in different programming languages in search of understanding and interesting abstractions.

Part 1: open and read
Part 2: writing
Part 3: basic piping
Part 4: piping to processes -- you are here
Part 5: structs and serialization

Continuing from piping files, today we pipe to external processes. Piping data between programs is one of the basic tenets of Unix. A second basic tenet is "everything is a file." The two aren't really that different though, writing to a file is roughly equivalent to writing data to a filesystem program (if you thought of FUSE just now, have a cookie.)

Bash is built for process-to-process interaction, so it's very easy to write pipelines and do IO redirection. To demonstrate, here's a pipeline that builds a histogram of some HTTP return codes. It has five processes talking to each other (the fifth being bash itself):

cat /var/log/apache2/access.log | egrep -o ' (200|302|304) ' | sort | uniq -c


But how does that work on the low level -- how does Bash build such pipelines? Let's find out.

To write data to a program, we need to have a pipe such that we hold the write end and the other program holds the read end. The pipe syscall gives us the two ends of a pipe, which we can then pass to a forked process. Here's a pipe syscall example in x86 assembler (32-bit this time.)

First, just the syscall:

.equ PIPE, 42
.section .bss
.lcomm pipes, 8
# ...
# The pipe syscall.
# Writes the pipe file descriptors to the array pointed by ebx.
movl $PIPE, %eax
movl $pipes, %ebx
int $0x80


Then a small demo program that creates a pipe, writes a string to its input, reads the string from its output, and prints out the read string (...hey, I had to come up with some kind of simple test):

# Define some globals
.equ EXIT, 1
.equ READ, 3
.equ WRITE, 4
.equ CLOSE, 6
.equ PIPE, 42

.equ STDOUT, 0

.equ PIPE_READ, 0
.equ PIPE_WRITE, 4

.section .data # static memory segment
hello:
.ascii "Hello\n"

.section .bss # dynamic memory segment
.lcomm buf, 6 # buffer to read into
.lcomm pipes, 8 # array for the pipe file descriptors (4 bytes each)

.section .text # program segment
.globl _start
_start:
# The pipe syscall.
# Writes the pipe file descriptors to the array pointed by ebx.
movl $PIPE, %eax
movl $pipes, %ebx
int $0x80

# Write 6 bytes from hello to the input pipe.
movl $WRITE, %eax
movl $pipes, %ecx
movl PIPE_WRITE(%ecx), %ebx
movl $hello, %ecx
movl $6, %edx
int $0x80

# Close the input pipe. The fd is already in ebx.
movl $CLOSE, %eax
int $0x80

# Read 6 bytes from the output pipe into buf. Buf should equal hello after this.
movl $READ, %eax
movl $pipes, %ecx
movl PIPE_READ(%ecx), %ebx
movl $buf, %ecx
movl $6, %edx
int $0x80

# Close the output pipe.
movl $CLOSE, %eax
int $0x80

# Write the contents of buf to stdout. Should print "Hello" with a linebreak.
movl $WRITE, %eax
movl $STDOUT, %ebx
movl $buf, %ecx
movl $6, %edx
int $0x80

# And we're done, exit with a zero.
movl $EXIT, %eax
movl $0, %ebx
int $0x80


Now, to talk to an external process, we first need two pipes. Then we need to fork a new process and reopen its stdin and stdout to point to our pipes. Finally we call exec and use our ends of the pipes to talk to the new process. Here's a C example that does the equivalent of echo 'Hello there!' | tr '[:lower:]' '[:upper:]', it even has error checking in parts:


#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>

/* forks the command and returns pipes connected to the command's stdin and stdout */

pid_t popen2(const char *command, char *const argv[], FILE **readable, FILE **writable)
{
int fds_writable[2];
int fds_readable[2];
pid_t pid;

// make two pipes, writable is connected to child stdin, readable to child stdout
if (pipe(fds_writable) == -1) exit(1);
if (pipe(fds_readable) == -1) exit(1);

pid = fork();
if (pid == -1) exit(2);

if (pid == 0) { // child process
// close the pipe ends used by the parent
if (close(fds_readable[0]) == -1) exit(3);
if (close(fds_writable[1]) == -1) exit(3);

if (dup2(fds_readable[1], STDOUT_FILENO) == -1) exit(4); // connect to stdout
if (dup2(fds_writable[0], STDIN_FILENO) == -1) exit(4); // connect to stdin
execv(command, argv); // replace the current process with the command
exit(1); // shouldn't be reached
}

// close the pipe ends used by the child
if (close(fds_readable[1]) == -1) exit(3);
if (close(fds_writable[0]) == -1) exit(3);

*readable = fdopen(fds_readable[0], "r");
*writable = fdopen(fds_writable[1], "w");
return pid;
}

int main(int argc, char *argv[])
{
FILE *r, *w;
char command[] = "/usr/bin/tr",
message[] = "Hello there!";
char *args[] = { "tr", "[:lower:]", "[:upper:]", NULL };
char *buf;

int len = strlen(message);
pid_t pid = popen2(command, args, &r, &w);

buf = (char*)malloc(20);

fwrite(message, 1, len, w);
fclose(w);

fgets(buf, 20, r);
fclose(r);

wait(pid);
puts(buf);

free(buf);

return 0;
}


Whew, that was a pain. Which is why most languages built for Unix scripting have an equivalent of popen2 in their standard library. For example, Ruby has IO.popen, simplifying our task significantly:

IO.popen("tr '[:lower:]' '[:upper:]'", "r+"){|pipe|
pipe.write "Hello there!"
pipe.close_write
puts pipe.read
}


As even that is often cumbersome, Ruby and Perl have a backtick operator that does something like popen("/bin/sh -c " + cmd, "r"){|p| p.read }. The following works fine in both Ruby and Perl:

print(`echo 'Hello there!' | tr '[:lower:]' '[:upper:]'`)


In the OCaml prelude.ml library, I have utilities to do similar things:

let () =
withCmd ["tr"; "[:lower:]"; "[:upper:]"] (fun ic oc ->
output_string oc "Hello there!";
close_out oc;
puts (readAll ic))


There's even a backtick equivalent:

let () = puts (readRawCmd "echo 'Hello there!' | tr '[:lower:]' '[:upper:]'")


Here is the implementation of withCmd and friends, it's got a base abstraction with a convenience garden built on top:

let withRawCmd cmd f =
let ic,oc = Unix.open_process cmd in
finally (fun _ -> close_out_noerr oc; close_in_noerr ic)
(f ic) oc

let withRawCmdStdin args f = withRawCmd args (fun ic oc -> close_in_noerr ic; f oc)
let withRawCmdStdout args f = withRawCmd args (fun ic oc -> close_out_noerr oc; f ic)

let withCmd args = withRawCmd (escape_cmd args)
let withCmdStdin args = withRawCmdStdin (escape_cmd args)
let withCmdStdout args = withRawCmdStdout (escape_cmd args)

let readCmd args = withCmdStdout args readAll
let readRawCmd args = withRawCmdStdout args readAll


Let's build a multi-process pipeline next. What we want is a way to send all data from one pipe to another. We could spawn a thread for each connection, reading from one process and writing to another, but that's a bit of a pain to implement. Routing the data through the parent process also causes extra copying, which we would like to avoid. Let's do the piping like Bash and start the processes in sequence, setting a processes' stdin be the stdout of the previous process.

First we build a pipeline from a list of commands. The pipeline builder is a fold over commands, accumulating with a pid list and the previous command's stdout. The body of the fold is a connector that connects the current command's stdin to the previous command's stdout. The connector takes a command and a read-pipe and returns a pid and a read-pipe. The pipeline builder returns the list of pids in the pipeline and a read-pipe hooked to the end of the pipeline.

After executing the pipeline, we iterate over the pids and wait on them to get rid of the [defunct] processes that would otherwise be left hanging around. Sounds easy enough, let's get cracking!

I used OCaml this time, so there's some Unix vs. channels -junk there.

First some generic fork + dup2 utils from unix.ml:

open Prelude

(* adapted from unix.ml *)
let try_set_close_on_exec fd =
try Unix.set_close_on_exec fd; true with Invalid_argument _ -> false

let open_proc cmd input output toclose =
let cloexec = List.for_all try_set_close_on_exec toclose in
match Unix.fork () with
0 -> if input <> Unix.stdin then begin Unix.dup2 input Unix.stdin; Unix.close input end;
if output <> Unix.stdout then begin Unix.dup2 output Unix.stdout; Unix.close output end;
if not cloexec then List.iter Unix.close toclose;
begin try Unix.execvp (head cmd) (Array.of_list cmd)
with _ -> exit 127
end
| id -> id

Then a function to create a pipe segment:

(* Fork cmd, setting cmd's stdin to read_fd. Return the cmd's stdout. *)
let popenWithStdin ?(toclose=[]) read_fd cmd =
let (in_read, in_write) = Unix.pipe () in
let pid = open_proc cmd read_fd in_write (in_read::toclose) in
Unix.close in_write;
Unix.close read_fd;
(pid, in_read)

And the pipeline builder and a withPipeline function for creating and running pipelines:

let rec buildPipeline ?toclose ifd =
foldl (fun (pids, ifd) cmd ->
let pid,ifd = popenWithStdin ?toclose ifd cmd in
(pid::pids, ifd))
([], ifd)

let withPipeline cmds f =
let ifd, ofd = Unix.pipe () in
(* if we don't close ofd in every process, all will hang *)
let pids, ifd = buildPipeline ~toclose:[ofd] ifd cmds in
let ic = Unix.in_channel_of_descr ifd
and oc = Unix.out_channel_of_descr ofd in
finally (fun _ -> close_out_noerr oc;
close_in_noerr ic;
iter (fun pid -> ignore (Unix.waitpid [] pid)) pids)
(f ic) oc

And a small main program to test it out:

let () =
let pipeline = [
["tr"; "[:upper:]"; "[:lower:]"];
["grep"; "-o"; "ello"];
] in
withPipeline pipeline
(fun ic oc ->
output_string oc "hElLo JeLlo";
close_out oc;
puts (readAll ic)
)

That was quite educational to write, I spent hours debugging a bug that was caused by not closing the withPipeline ofd in the subprocesses. You really should close all unneeded fds before exec.

Speaking of which, Python's subprocess module is nice. It takes care of the hard parts. It even closes file descriptors. All we have to do is snap the processes together:

from subprocess import Popen, PIPE

p1 = Popen(["tr", "[:upper:]", "[:lower:]"], stdin=PIPE, stdout=PIPE, close_fds=True)
p2 = Popen(["grep", "-o", "ello"], stdin=p1.stdout, stdout=PIPE, close_fds=True)
p1.stdin.write("hElLo JeLlo")
p1.stdin.close()
output = p2.stdout.read()
p2.stdout.close()
print output


And that's it for today. The example programs got a bit too long for my liking, I just hope that they weren't too onerous to read. I thought of doing a Haskell example as well, but writing one pipe-passing popen fold is quite enough for one day, thank you very much. Structured I/O next (as in, reading something else than arrays of bytes.)

No comments:

Blog Archive