Read non-blocking from multiple fifos in parallel

Solution 1:

This solution will only work if the number of fifos are fewer than the number of jobs that GNU parallel can run in parallel (which is limited by file handles and number of processes):

parallel -j0 --line-buffer cat ::: fifo*

It seems to be able to move up to 500 MB/s:

window1$ mkfifo {1..100}
window1$ parallel -j0 --line-buffer cat ::: {1..100} | pv >/dev/null

window2$ parallel -j0 'cat bigfile > ' ::: *

And it does not mix half-lines:

window1$ mkfifo {1..100}
window1$ parallel -j0 --line-buffer cat ::: {1..100} &

window2$ parallel -j0 'traceroute {}.1.1.1 > {}' ::: *

It reads jobs in parallel (it does not read one job completely before going to the next):

window1$ mkfifo {1..100}
window1$ parallel -j0 --line-buffer cat ::: * > >(tr -s ABCabc)

window2$ long_lines_with_pause() {
            perl -e 'print STDOUT "a"x30000_000," "'                                                      
    perl -e 'print STDOUT "b"x30000_000," "'                                                      
    perl -e 'print STDOUT "c"x30000_000," "'                                                      
    echo "$1"                                                                                     
    sleep 2                                                                                       
    perl -e 'print STDOUT "A"x30000_000," "'                                                      
    perl -e 'print STDOUT "B"x30000_000," "'                                                      
    perl -e 'print STDOUT "C"x30000_000," "'                                                      
    echo "$1"                                                                                     
}
window2$ export -f long_lines_with_pause
window2$ parallel -j0 'long_lines_with_pause {} > {}' ::: *

Here a lot of 'a b c' (first half of a job) will be printed before 'A B C' (second half of the job).

Solution 2:

So,

tail -q -n+1 -f --pid=stop-tail-when-this-is-gone fifo1 fifo2 fifo3

almost works (as alluded to in initial comments about this earlier version of my answer, though you may need a "for f in fifo*; cat < /dev/null > $f & done" beforehand to ensure all FIFOs are open for write because coreutils tail opens them O_RDONLY without O_NONBLOCK).

Unfortunately, there is a bug in that tail is careful about line/record endings only with inputs from pipes on stdin but not with input from named pipes/FIFOs in arguments. Someday someone may fix coreutils tail.

In the interim, to get a true multi-consumer/single producer queue honoring the line-endings, you can use a simple 100-ish-line C program I call tailpipes.c:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>    //TODO: Find&document build environments lacking memrchr
#include <unistd.h>
#include <fcntl.h>
#include <time.h>
#include <errno.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/stat.h>
#define errstr strerror(errno)

char const * const Use = "%s: %s\n\nUsage:\n\n"
"  %s [-p PID] [-o OPEN_MODE(RW)] [-d DLM(\\n)] [-s SEC(.01)] PATH1 PATH2..\n\n"
"Read delimited records (lines by default) from all input paths, writing only\n"
"complete records to stdout and changing to a stop-at-EOF mode upon receiving\n"
"SIGHUP (unlike \"tail -fqn+1\" which just dies) OR when we first notice that\n"
"PID does not exist (if PID is given).  Since by default fifos are opened RW,\n"
"signal/PID termination is needed to not loop forever, but said FIFOs may be\n"
"closed & reopened by other processes as often as is convenient. For one-shot\n"
"writing style, ending input reads at the first EOF, use \"-oRO\".  Also, DLM\n"
"adjusts the record delimiter byte from the default newline, and SEC adjusts\n"
"max select sleep time.  Any improperly terminated final records are sent to\n"
"stderr at the end of execution (with a label and bracketing).\n";

int writer_done;
void sig(int signum) { writer_done = 1; }

int main(int N, char *V[]) {
    signed char     ch;
    char           *buf[N-1], delim = '\n', *V0 = V[0], *eol;
    int             len[N-1], fds[N-1], nBf[N-1], i, fdMx = 0, nS = 0, nF = 0,
                    oFlags = O_RDWR;
    pid_t           pid = 0;
    ssize_t         nR, nW;
    struct timespec tmOut = { 0, 10000000 }; //10 ms select time out
    fd_set          fdRdMaster, fdRd;
    //If we get signaled before here, this program dies and data may be lost.
    //If possible use -p PID option w/pre-extant PID of appropriate lifetime.
    signal(SIGHUP, sig);                    //Install sig() for SIGHUP
    memset((void *)fds, 0, sizeof fds);
    memset((void *)len, 0, sizeof len);
    FD_ZERO(&fdRdMaster);
    fdRd = fdRdMaster;
    while ((ch = getopt(N, V, "d:p:s:o:")) != -1)
        switch (ch) {                       //For \0 do '' as a sep CLI arg
            double tO;
            case 'd': delim  = optarg ? *optarg : '\n';   break;
            case 'p': pid    = optarg ? atoi(optarg) : 0; break;
            case 's': tO = optarg ? atof(optarg) : .01;
                      tmOut.tv_sec = (long)tO;
                      tmOut.tv_nsec = 1e9 * (tO - tmOut.tv_sec);
                      break;
            case 'o': oFlags = (optarg && strcasecmp(optarg, "ro") == 0) ?
                                 O_RDONLY | O_NONBLOCK : O_RDWR;
                      break;
            default: return fprintf(stderr, Use, V0, "bad option", V0), 1;
        }
    V += optind; N -= optind;               //Shift off option args
    if (N < 1)
        return fprintf(stderr, Use, V0, "too few arguments", V0), 2;
    setvbuf(stdout, NULL, _IONBF, 65536);   //Full pipe on Linux
    for (i = 0; i < N; i++)                 //Check for any available V[]
        if ((fds[i] = open(V[i], oFlags)) != -1) {
            struct stat st;
            fstat(fds[i], &st);
            if (!S_ISFIFO(st.st_mode))
                return fprintf(stderr,"%s: %s not a named pipe\n", V0, V[i]), 3;
            nF++;
            FD_SET(fds[i], &fdRdMaster);    //Add fd to master copy for pselect
            buf[i] = malloc(nBf[i] = 4096);
            if (fds[i] > fdMx)
                fdMx = fds[i];
        } else if (errno == EINTR) {        //We may get signaled to finish up..
            i--; continue;                  //..before we even this far.
        } else
            return fprintf(stderr, "%s: open(%s): %s\n", V0, V[i], errstr), 3;
    fdMx++;
    fdRd = fdRdMaster;
    while (nF && (nS = pselect(fdMx, &fdRd, NULL, NULL, &tmOut, NULL)) != -99) {
        if (pid && kill(pid, 0) != 0 && errno != EPERM) //Given pid didn't exist
            writer_done = 1;
        if (nS == 0 && writer_done)                     //No input & no writers
            break;
        else if (nS == -1) {                            //Some select error:
            if (errno != EINTR && errno == EAGAIN)      //..fatal or retry
                return fprintf(stderr, "%s: select: %s\n", V0, errstr), 4;
            continue;
        }
        for (i = 0; nS > 0 && i < N; i++) {             //For all fds..
            if (fds[i] < 0 || !FD_ISSET(fds[i], &fdRd)) //with readable data
                continue;
            if ((nR = read(fds[i], buf[i]+len[i], nBf[i] - len[i])) < 0) {
                if (errno != EAGAIN && errno != EINTR)
                    fprintf(stderr, "%s: read: %s\n", V0, errstr);
                continue;
            } else if (oFlags == (O_RDONLY | O_NONBLOCK) && nR == 0) {
                FD_CLR(fds[i], &fdRdMaster);
                nF--;
                free(buf[i]);
            }
            len[i] += nR;                               //Update Re: read data
            if ((eol = memrchr(buf[i], delim, len[i]))) {
                nW = eol - buf[i] + 1;                  //Only to last delim
                if (fwrite(buf[i], nW, 1, stdout) == 1) {
                    memmove(buf[i], buf[i] + nW, len[i] - nW);
                    len[i] -= nW;                       //Residual buffer shift
                } else
                    return fprintf(stderr, "%s: %d bytes->stdout failed: %s\n",
                                   V0, len[i], errstr), 5;
            } else if (len[i] == nBf[i]) {              //NoDelim&FullBuf=>GROW
                void *tmp;
                if (nBf[i] >= 1 << 30)
                    return fprintf(stderr, "%s: record > 1 GiB\n", V0), 6;
                nBf[i] *= 2;
                if (!(tmp = realloc(buf[i], nBf[i])))
                    return fprintf(stderr,"%s: out of memory\n", V0), 7;
                buf[i] = tmp;
            }
        }
        fdRd = fdRdMaster;
    }
    for (i = 0; i < N; i++)                     //Ensure any residual data is..
        if (len[i] > 0) {                       //..labeled,bracketed,=>stderr.
            fprintf(stderr, "%s: %s: final unterminated record: {", V0, V[i]);
            fwrite(buf[i], len[i], 1, stderr);
            fputs("}\n", stderr);
        }
    return 0;
}

Install is cut & paste & cc -Owhatever tailpipes.c -o somewhere-in-$PATH/tailpipes. Tested on Linux & FreeBSD. I get about 2500e6 bytes/sec output, but memory may be faster than the 500e6 bytes/sec box.

The algorithm is roughly as suggested, but more general. O_NONBLOCK is only needed with O_RDONLY and with some options for ease of use, like opening FIFOs O_RDWR by default so writers can close and reopen many times and use -p PID tracking for a race-free protocol. You can pass -oRO to use EOF if you want. tailpipes also handles incomplete lines at program termination, sending them labeled and bracketed to stderr in case there is easy post-processing that can be done to make the records whole or if logs of them would be useful for debugging.

Example usage. GNU xargs can be a single-consumer, multi-producer/fan-out part of a map-reduce-ish parallel pipeline with tailpipes operating as the record-boundary-honoring fan-in part, all with no disk space used for temporary files:

export MYTEMP=$(mktemp -d /tmp/MYPROG.XXXXX)
FIFOs=`n=0; while [ $n -lt 8 ]; do echo $MYTEMP/$n; n=$((n+1)); done`
mkfifo $FIFOs
sleep 2147483647 & p=$!       #Cannot know xargs pid is good for long
( find . -print0 | xargs -0 -P8 --process-slot-var=MYSLOT MYPROGRAM
  kill $p ) &                 #Inform tailpipes writers are done
tailpipes -p$p $FIFOs | CONSUMING-PIPELINE
rm -rf $MYTEMP
wait                          #Wait for xargs subshell to finish

In the above, it is important that A) n goes from 0 to the appropriate upper bound because that is the scheme xargs uses for MYSLOT, and B) MYPROGRAM direct its outputs to a newly assigned $MYSLOT-keyed file like $MYTEMP/$MYSLOT, e.g. exec > $MYTEMP/$MYSLOT if MYPROGRAM is a shell script. The shell/program wrapper could be eliminated in many cases if xargs took a hypothetical --process-slot-out to setup its kids stdouts.