Is there an intra-process local pipe in Qt?

Does Qt have a QIODevice pair that would work for intra-process point-to-point communications?

One could use the concrete QTCPSocket or QLocalSocket, but the server-side connection API is a bit cumbersome, and it seems wasteful to force the data through the OS.


Solution 1:

The following is a usable, rudimentary implementation. It uses an internal signal-slot pair to push the data to the other endpoint. That way either end of the connection can live in any thread, and the ends can be moved between threads without losing data or inducing any races.

The private QRingBuffer is used in lieu of reinventing the wheel. Add QT += core-private to the .pro file to make that available. Qt PIMPL is used to gain access to the internal device buffers in Qt versions 5.7 and above.

If you wish to instantiate an open pipe, as may often be the case, you can pass the I/O mode to the constructor. Typical use:

int main(/*…*/)
{
   /*…*/
   AppPipe end1 { QIODevice::ReadWrite };
   AppPipe end2 { &end1, QIODevice::ReadWrite };
   AppPipe end3 { &end1, QIODevice::ReadOnly };
   // the pipes are open ready to use
   /*…*/
}

Whatever you write to one pipe, ends up as readable data in the other, connected pipes, and vice versa. In the above example, data written to end1 is readable from both end2 and end3 independently. Data written to end2 is readable from end1. end3 is effectively a listen-only pipe. Connection of additional pipes is cheap - there's no O(N) cost associated with sending big chunks of data to multiple pipes, because read QRingBuffers of connected pipes store shallow copies of entire byte arrays sent from the originating pipe.

All of the QIODevice semantics hold - you can connect to the readyRead signal, use the pipe with a QDataStream or a QTextStream, etc. As with any QIODevice, you can only use the class from its thread(), but the other endpoint can live in any thread, and both can be moved between threads as desired, without loss of data.

If the other pipe end is not open and readable, the writes are no-ops, even though they succeed. Closing the pipe clears the read and write buffers, so that it can be re-opened for reuse.

The pipe buffers write data by default, and the write buffer can be forcibly flushed using AppPipe::flush(), unless opened in QIODevice::Unbuffered mode.

The hasIncoming and hasOutgoing signals are useful in monitoring the data going over the pipe.

// https://github.com/KubaO/stackoverflown/tree/master/questions/local-pipe-32317081
// This project is compatible with Qt 4 and Qt 5
#include <QtTest>
#include <private/qiodevice_p.h>
#include <private/qringbuffer_p.h>
#include <algorithm>
#include <climits>

#ifndef Q_DECL_OVERRIDE
#define Q_DECL_OVERRIDE
#endif

class AppPipePrivate : public QIODevicePrivate {
public:
#if QT_VERSION < QT_VERSION_CHECK(5,7,0)
   QRingBuffer buffer;
   QRingBuffer writeBuffer;
   int writeBufferChunkSize;
#endif
   const QByteArray *writeData;
   AppPipePrivate() : writeData(0) { writeBufferChunkSize = 4096; }
};

/// A simple point-to-point intra-process pipe. The other endpoint can live in any
/// thread.
class AppPipe : public QIODevice {
   Q_OBJECT
   Q_DECLARE_PRIVATE(AppPipe)
   static inline int intLen(qint64 len) { return std::min(len, qint64(INT_MAX)); }
   Q_SLOT void _a_write(const QByteArray &data) {
      Q_D(AppPipe);
      if (!(d->openMode & QIODevice::ReadOnly)) return; // We must be readable.
      d->buffer.append(data); // This is a chunk shipped from the source.
      emit hasIncoming(data);
      emit readyRead();
   }
   void hasOutgoingLong(const char *data, qint64 len) {
      while (len) {
         int const size = intLen(len);
         emit hasOutgoing(QByteArray(data, size));
         data += size;
         len -= size;
      }
   }
public:
   AppPipe(QIODevice::OpenMode mode, QObject *parent = 0) :
      QIODevice(*new AppPipePrivate, parent) {
      open(mode);
   }
   AppPipe(AppPipe *other, QIODevice::OpenMode mode, QObject *parent = 0) :
      QIODevice(*new AppPipePrivate, parent) {
      open(mode);
      addOther(other);
   }
   AppPipe(AppPipe *other, QObject *parent = 0) :
      QIODevice(*new AppPipePrivate, parent) {
      addOther(other);
   }
   ~AppPipe() Q_DECL_OVERRIDE {}
   void addOther(AppPipe *other) {
      if (other) {
         connect(this, SIGNAL(hasOutgoing(QByteArray)), other, SLOT(_a_write(QByteArray)), Qt::UniqueConnection);
         connect(other, SIGNAL(hasOutgoing(QByteArray)), this, SLOT(_a_write(QByteArray)), Qt::UniqueConnection);
      }
   }
   void removeOther(AppPipe *other) {
      disconnect(this, SIGNAL(hasOutgoing(QByteArray)), other, SLOT(_a_write(QByteArray)));
      disconnect(other, SIGNAL(hasOutgoing(QByteArray)), this, SLOT(_a_write(QByteArray)));
   }
   void flush() {
      Q_D(AppPipe);
      while (!d->writeBuffer.isEmpty()) {
         QByteArray const data = d->writeBuffer.read();
         emit hasOutgoing(data);
         emit bytesWritten(data.size());
      }
   }
   void close() Q_DECL_OVERRIDE {
      Q_D(AppPipe);
      flush();
      QIODevice::close();
      d->buffer.clear();
   }
   qint64 write(const QByteArray &data) { // This is an optional optimization. The base method works OK.
      Q_D(AppPipe);
      QScopedValueRollback<const QByteArray*> back(d->writeData);
      if (!(d->openMode & Text))
         d->writeData = &data;
      return QIODevice::write(data);
   }
   qint64 writeData(const char *data, qint64 len) Q_DECL_OVERRIDE {
      Q_D(AppPipe);
      bool buffered = !(d->openMode & Unbuffered);
      if (buffered && (d->writeBuffer.size() + len) > d->writeBufferChunkSize)
         flush();
      if (!buffered
          || len > d->writeBufferChunkSize
          || (len == d->writeBufferChunkSize && d->writeBuffer.isEmpty()))
      {
         if (d->writeData && d->writeData->data() == data && d->writeData->size() == len)
            emit hasOutgoing(*d->writeData);
         else
            hasOutgoingLong(data, len);
      }
      else
         memcpy(d->writeBuffer.reserve(len), data, len);
      return len;
   }
   bool isSequential() const Q_DECL_OVERRIDE { return true; }
   Q_SIGNAL void hasOutgoing(const QByteArray &);
   Q_SIGNAL void hasIncoming(const QByteArray &);
#if QT_VERSION >= QT_VERSION_CHECK(5,7,0)
   // all the data is in the read buffer already
   qint64 readData(char *, qint64) Q_DECL_OVERRIDE { return 0; }
#else
   qint64 readData(char *data, qint64 len) Q_DECL_OVERRIDE {
      Q_D(AppPipe);
      qint64 hadRead = 0;
      while (len && !d->buffer.isEmpty()) {
         int size = d->buffer.read(data, intLen(len));
         hadRead += size;
         data += size;
         len -= size;
      }
      return hadRead;
   }
   bool canReadLine() const Q_DECL_OVERRIDE {
      Q_D(const AppPipe);
      return d->buffer.indexOf('\n') != -1 || QIODevice::canReadLine();
   }
   qint64 bytesAvailable() const Q_DECL_OVERRIDE {
      Q_D(const AppPipe);
      return QIODevice::bytesAvailable() + d->buffer.size();
   }
   qint64 bytesToWrite() const Q_DECL_OVERRIDE {
      Q_D(const AppPipe);
      return QIODevice::bytesToWrite() + d->writeBuffer.size();
   }
#endif
};

// ...

#include "main.moc"

A minimal test harness:

class TestAppPipe : public QObject {
   Q_OBJECT
   QByteArray data1, data2;
   struct PipePair {
      AppPipe end1, end2;
      PipePair(QIODevice::OpenMode mode = QIODevice::NotOpen) :
         end1(QIODevice::ReadWrite | mode), end2(&end1, QIODevice::ReadWrite | mode) {}
   };
   Q_SLOT void initTestCase() {
      data1 = randomData();
      data2 = randomData();
   }
   Q_SLOT void sizes() {
      QCOMPARE(sizeof(AppPipe), sizeof(QIODevice));
   }
   Q_SLOT void basic() {
      PipePair p;
      QVERIFY(p.end1.isOpen() && p.end1.isWritable() && p.end1.isReadable());
      QVERIFY(p.end2.isOpen() && p.end2.isWritable() && p.end2.isReadable());
      static const char hello[] = "Hello There!";
      p.end1.write(hello);
      p.end1.flush();
      QCOMPARE(p.end2.readAll(), QByteArray(hello));
   }
   static QByteArray randomData(int const size = 1024*1024*32) {
      QByteArray data;
      data.resize(size);
      char *const d = data.data();
      for (char *p = d+data.size()-1; p >= d; --p)
         *p = qrand();
      Q_ASSERT(data.size() == size);
      return data;
   }
   static void randomChunkWrite(AppPipe *dev, const QByteArray &payload) {
      for (int written = 0, left = payload.size(); left; ) {
         int const chunk = std::min(qrand() % 82931, left);
         dev->write(payload.mid(written, chunk));
         left -= chunk; written += chunk;
      }
      dev->flush();
   }
   void runBigData(PipePair &p) {
      Q_ASSERT(!data1.isEmpty() && !data2.isEmpty());
      randomChunkWrite(&p.end1, data1);
      randomChunkWrite(&p.end2, data2);
      QCOMPARE(p.end1.bytesAvailable(), qint64(data2.size()));
      QCOMPARE(p.end2.bytesAvailable(), qint64(data1.size()));
      QCOMPARE(p.end1.readAll(), data2);
      QCOMPARE(p.end2.readAll(), data1);
   }
   Q_SLOT void bigDataBuffered() {
      PipePair p;
      runBigData(p);
   }
   Q_SLOT void bigDataUnbuffered() {
      PipePair p(QIODevice::Unbuffered);
      runBigData(p);
   }
   Q_SLOT void cleanupTestCase() {
      data1.clear(); data2.clear();
   }
};

QTEST_MAIN(TestAppPipe)
# local-pipe-32317081.pro
QT = core
greaterThan(QT_MAJOR_VERSION, 4): QT = core-private testlib
else: CONFIG += qtestlib
DEFINES += \
  QT_DEPRECATED_WARNINGS \
  QT_DISABLE_DEPRECATED_BEFORE=0x060000 \
  QT_RESTRICTED_CAST_FROM_ASCII
CONFIG += console c++14
CONFIG -= app_bundle
TEMPLATE = app
SOURCES = main.cpp