I/O in concurrent program

I'm working on a concurrent program; it has two threads, one of which listens messages from a server and the other one sends messages to it. I need to obtain commands from the user (using cin?) and show messages coming from the server both at the same time.

How can I handle that situation? The problem is that if I'm reading a command from the user when a message comes, the user's input is messed up with other stuff.

Thanks in advance


Some alternatives

  1. have your command dump all the messages that have occurred since the last invocation of the command. That way the output is finite.

  2. have your cli command monitor all traffic continuously until ctrl-c (or some other key combination) is pressed then it reverts back to your application's cli prompt.

  3. have your cli command send data to a file and monitor that with a tail type tool


I took my old sample code and tried to turn it into an MCVE. ("Minimal" does not necessarily mean "short", does it?)

This is a very simple concept of a "shell" which supports one thread for input while multiple threads may do output.

  1. The keyboard input is done non-echoing. This is non-portable. Therefore I provide two implementations of function getChar() – one for MS Windows and another for non-MS Windows (which considers actually only *ix OSes). The latter is "strongly inspired" by SO: How to implement getch() function of C in Linux?.

  2. The input characters are stored in a std::string.

  3. The output erases the prompt and the current input text (repeating the output of "\b \b" resp.), prints the output text (incl. newline), and prints the prompt and current input buffer again.

The output is mutex guarded to grant thread-safety.

This is the sample code miniShell.cc:

// system header:
#ifdef _WIN32
#include <conio.h>
#else // (not) _WIN32
#include <termios.h>
#include <unistd.h>
#include <stdio.h>
#endif // _WIN32

/// reads a character from console without echo.
#ifdef _WIN32
inline int getChar() { return _getch(); }
#else // (not) _WIN32
int getChar()
{
  struct termios oldattr;
  tcgetattr(STDIN_FILENO, &oldattr);
  struct termios newattr = oldattr;
  newattr.c_lflag &= ~(ICANON | ECHO);
  tcsetattr(STDIN_FILENO, TCSANOW, &newattr);
  const int ch = getchar();
  tcsetattr(STDIN_FILENO, TCSANOW, &oldattr);
  return ch;
}
#endif // _WIN32

// standard C/C++ header:
#include <cstring>
#include <mutex>
#include <string>

/* provides a class for a simple thread-safe mini-shell.
 *
 * It is assumed that one thread may await user input (read()) while
 * another thread may (or may not) output text from time to time.
 * The mini-shell grants that the input line is always the last line.
 */
class Console {

  // variable:
  private:
    // mutex for console I/O
    std::mutex _mtx;
    // current input
    std::string _input;
    // prompt output
    std::string _prompt;

  // methods:
  public:
    /// constructor.
    Console() { }

    // disabled:
    Console(const Console&) = delete;
    Console& operator = (const Console&) = delete;

    // reads a line from console and returns input string
    std::string read();

    /* writes text to console.
     *
     * text the text
     * size size of text
     */
    void write(const char *text, size_t size);
    void write(const char *text) { write(text, strlen(text)); }
    void write(const std::string &text) { write(text.c_str(), text.size()); }
};

// standard C/C++ header:
#include <atomic>
#include <chrono>
#include <iomanip>
#include <iostream>
#include <sstream>
#include <thread>

std::string Console::read()
{
  { // activate prompt
    std::lock_guard<std::mutex> lock(_mtx);
    _prompt = "> "; _input.clear();
    std::cout << _prompt << std::flush;
  }
#ifdef _WIN32
  enum { Enter = '\r', BackSpc = '\b' };
#else // (not) _WIN32
  enum { Enter = '\n', BackSpc = 127 };
#endif // _WIN32
  // input loop
  for (;;) {
    switch (int c = getChar()) {
      case Enter: {
        std::lock_guard<std::mutex> lock(_mtx);
        std::string input = _input;
        _prompt.clear(); _input.clear();
        std::cout << std::endl;
        return input;
      } // unreachable: break;
      case BackSpc: {
        std::lock_guard<std::mutex> lock(_mtx);
        if (_input.empty()) break; // nothing to do
        _input.pop_back();
        std::cout << "\b \b" << std::flush;
      } break;
      default: {
        if (c < ' ' || c >= '\x7f') break;
        std::lock_guard<std::mutex> lock(_mtx);
        _input += c;
        std::cout << (char)c << std::flush;
      } break;
    }
  }
}

void Console::write(const char *text, size_t len)
{
  if (!len) return; // nothing to do
  bool eol = text[len - 1] == '\n';
  std::lock_guard<std::mutex> lock(_mtx);
  // remove current input echo
  if (size_t size = _prompt.size() + _input.size()) {
    std::cout
      << std::setfill('\b') << std::setw(size) << ""
      << std::setfill(' ') << std::setw(size) << ""
      << std::setfill('\b') << std::setw(size) << "";
  }
  // print text
  std::cout << text;
  if (!eol) std::cout << std::endl;
  // print current input echo
  std::cout << _prompt << _input << std::flush;
}

// a sample application

// shared data for main thread and data processing thread
struct Shared {
  // flag: true ... exit communication thread and main loop
  std::atomic<bool> exit;
  // flag: true ... start data processing
  std::atomic<bool> start;
  // the mini console
  Console console;

  // constructor.
  Shared(): exit(false), start(true) { }
};

void dataProc(Shared &shared)
{
  while (!shared.exit) {
    // "busy" wait for start (condition would be more elegant)
    while (!shared.start) {
      if (shared.exit) return;
      std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
    // do data processing
    shared.console.write("Starting data processing.");
    for (int i = 0, n = 20; i < n; ++i) {
      // "busy" wait for start (condition would be more elegant)
      if (!shared.start) {
        shared.console.write("Data processing stopped.");
        while (!shared.start) {
          if (shared.exit) return;
          std::this_thread::sleep_for(std::chrono::milliseconds(100));
        }
        shared.console.write("Data processing restarted.");
      }
      // consume some time (to simulate heavy computation)
      std::this_thread::sleep_for(std::chrono::milliseconds(250));
      // do some console output about progress
      { std::ostringstream fmt;
        fmt << "Step " << i + 1 << '/' << n;
        shared.console.write(fmt.str());
      }
    }
    shared.console.write("Data processing done.");
    shared.start = false;
  }
}

void processInput(const std::string &input, Shared &shared)
{
  if (input == "start") shared.start = true;
  else if (input == "stop") shared.start = false;
  else if (input == "exit") shared.exit = true;
  else if (input.size()) shared.console.write("Wrong command!");
}

int main()
{
  Shared shared;
  // start a thread for some kind of data processing
  std::thread threadDataProc(&dataProc, std::ref(shared));
  // main loop
  while (!shared.exit) {
    shared.console.write("Commands: start stop exit");
    std::string input = shared.console.read();
    processInput(input, shared);
  }
  // join data processing thread
  threadDataProc.join();
  // done
  return 0;
}

I compiled and tested in VS2013 vs. bash/Xterm of cygwin on Windows 10. (cygwin was the closest to Linux I have at hand.)

Snapshot of miniShell VS2013/cmd.exe (black) vs. g++/bash/Xterm/cygwin (blue)

Please, keep in mind that I wrote this code whereby simplicity was more important than perfection or comfort.