How would you implement a basic event-loop?
If you have worked with gui toolkits, you know that there is a event-loop/main-loop that should be executed after everything is done, and that will keep the application alive and responsive to different events. For example, for Qt, you would do this in main():
int main() {
QApplication app(argc, argv);
// init code
return app.exec();
}
Which in this case, app.exec() is the application's main-loop.
The obvious way to implement this kind of loop would be:
void exec() {
while (1) {
process_events(); // create a thread for each new event (possibly?)
}
}
But this caps the CPU to 100% and is practicaly useless. Now, how can I implement such an event loop that is responsive without eating the CPU altogether?
Answers are appreciated in Python and/or C++. Thanks.
Footnote: For the sake of learning, I will implement my own signals/slots, and I would use those to generate custom events (e.g. go_forward_event(steps)
). But if you know how I can use system events manually, I would like to know about that too.
I used to wonder a lot about the same!
A GUI main loop looks like this, in pseudo-code:
void App::exec() {
for(;;) {
vector<Waitable> waitables;
waitables.push_back(m_networkSocket);
waitables.push_back(m_xConnection);
waitables.push_back(m_globalTimer);
Waitable* whatHappened = System::waitOnAll(waitables);
switch(whatHappened) {
case &m_networkSocket: readAndDispatchNetworkEvent(); break;
case &m_xConnection: readAndDispatchGuiEvent(); break;
case &m_globalTimer: readAndDispatchTimerEvent(); break;
}
}
}
What is a "Waitable"? Well, it's system dependant. On UNIX it's called a "file descriptor" and "waitOnAll" is the ::select system call. The so-called vector<Waitable>
is a ::fd_set
on UNIX, and "whatHappened" is actually queried via FD_ISSET
. The actual waitable-handles are acquired in various ways, for example m_xConnection
can be taken from ::XConnectionNumber(). X11 also provides a high-level, portable API for this -- ::XNextEvent() -- but if you were to use that, you wouldn't be able to wait on several event sources simultaneously.
How does the blocking work? "waitOnAll" is a syscall that tells the OS to put your process on a "sleep list". This means you are not given any CPU time until an event occurs on one of the waitables. This, then, means your process is idle, consuming 0% CPU. When an event occurs, your process will briefly react to it and then return to idle state. GUI apps spend almost all their time idling.
What happens to all the CPU cycles while you're sleeping? Depends. Sometimes another process will have a use for them. If not, your OS will busy-loop the CPU, or put it into temporary low-power mode, etc.
Please ask for further details!
Python:
You can look at the implementation of the Twisted reactor which is probably the best implementation for an event loop in python. Reactors in Twisted are implementations of an interface and you can specify a type reactor to run: select, epoll, kqueue (all based on a c api using those system calls), there are also reactors based on the QT and GTK toolkits.
A simple implementation would be to use select:
#echo server that accepts multiple client connections without forking threads
import select
import socket
import sys
host = ''
port = 50000
backlog = 5
size = 1024
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind((host,port))
server.listen(backlog)
input = [server,sys.stdin]
running = 1
#the eventloop running
while running:
inputready,outputready,exceptready = select.select(input,[],[])
for s in inputready:
if s == server:
# handle the server socket
client, address = server.accept()
input.append(client)
elif s == sys.stdin:
# handle standard input
junk = sys.stdin.readline()
running = 0
else:
# handle all other sockets
data = s.recv(size)
if data:
s.send(data)
else:
s.close()
input.remove(s)
server.close()
Generally I would do this with some sort of counting semaphore:
- Semaphore starts at zero.
- Event loop waits on semaphore.
- Event(s) come in, semaphore is incremented.
- Event handler unblocks and decrements the semaphore and processes the event.
- When all events are processed, semaphore is zero and event loop blocks again.
If you don't want to get that complicated, you could just add a sleep() call in your while loop with a trivially small sleep time. That will cause your message processing thread to yield it's CPU time to other threads. The CPU won't be pegged at 100% any more, but it's still pretty wasteful.
I would use a simple, light-weight messaging library called ZeroMQ (http://www.zeromq.org/). It is an open source library (LGPL). This is a very small library; on my server, the whole project compiles in about 60 seconds.
ZeroMQ will hugely simplify your event-driven code, AND it is also THE most efficient solution in terms of performance. Communicating between threads using ZeroMQ is much faster (in terms of speed) than using semaphores or local UNIX sockets. ZeroMQ also be a 100% portable solution, whereas all the other solutions would tie your code down to a specific operating system.
Here is a C++ event loop. At the creation of the object EventLoop
, it creates a thread which continually runs any task given to it. If there are no tasks available, the main thread goes to sleep until some task is added.
First we need a thread safe queue which allow multiple producers and at least a single consumer (the EventLoop
thread). The EventLoop
object which controls the consumers and producers. With a little change, it can be added multiple consumers (runners threads), instead of only one thread.
#include <stdio.h>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <iostream>
#include <set>
#include <functional>
#if defined( WIN32 )
#include <windows.h>
#endif
class EventLoopNoElements : public std::runtime_error
{
public:
EventLoopNoElements(const char* error)
: std::runtime_error(error)
{
}
};
template <typename Type>
struct EventLoopCompare {
typedef std::tuple<std::chrono::time_point<std::chrono::system_clock>, Type> TimePoint;
bool operator()(const typename EventLoopCompare<Type>::TimePoint left, const typename EventLoopCompare<Type>::TimePoint right) {
return std::get<0>(left) < std::get<0>(right);
}
};
/**
* You can enqueue any thing with this event loop. Just use lambda functions, future and promises!
* With lambda `event.enqueue( 1000, [myvar, myfoo](){ myvar.something(myfoo); } )`
* With futures we can get values from the event loop:
* ```
* std::promise<int> accumulate_promise;
* event.enqueue( 2000, [&accumulate_promise](){ accumulate_promise.set_value(10); } );
* std::future<int> accumulate_future = accumulate_promise.get_future();
* accumulate_future.wait(); // It is not necessary to call wait, except for syncing the output.
* std::cout << "result=" << std::flush << accumulate_future.get() << std::endl;
* ```
* It is just not a nice ideia to add something which hang the whole event loop queue.
*/
template <class Type>
struct EventLoop {
typedef std::multiset<
typename EventLoopCompare<Type>::TimePoint,
EventLoopCompare<Type>
> EventLoopQueue;
bool _shutdown;
bool _free_shutdown;
std::mutex _mutex;
std::condition_variable _condition_variable;
EventLoopQueue _queue;
std::thread _runner;
// free_shutdown - if true, run all events on the queue before exiting
EventLoop(bool free_shutdown)
: _shutdown(false),
_free_shutdown(free_shutdown),
_runner( &EventLoop<Type>::_event_loop, this )
{
}
virtual ~EventLoop() {
std::unique_lock<std::mutex> dequeuelock(_mutex);
_shutdown = true;
_condition_variable.notify_all();
dequeuelock.unlock();
if (_runner.joinable()) {
_runner.join();
}
}
// Mutex and condition variables are not movable and there is no need for smart pointers yet
EventLoop(const EventLoop&) = delete;
EventLoop& operator =(const EventLoop&) = delete;
EventLoop(const EventLoop&&) = delete;
EventLoop& operator =(const EventLoop&&) = delete;
// To allow multiple threads to consume data, just add a mutex here and create multiple threads on the constructor
void _event_loop() {
while ( true ) {
try {
Type call = dequeue();
call();
}
catch (EventLoopNoElements&) {
return;
}
catch (std::exception& error) {
std::cerr << "Unexpected exception on EventLoop dequeue running: '" << error.what() << "'" << std::endl;
}
catch (...) {
std::cerr << "Unexpected exception on EventLoop dequeue running." << std::endl;
}
}
std::cerr << "The main EventLoop dequeue stopped running unexpectedly!" << std::endl;
}
// Add an element to the queue
void enqueue(int timeout, Type element) {
std::chrono::time_point<std::chrono::system_clock> timenow = std::chrono::system_clock::now();
std::chrono::time_point<std::chrono::system_clock> newtime = timenow + std::chrono::milliseconds(timeout);
std::unique_lock<std::mutex> dequeuelock(_mutex);
_queue.insert(std::make_tuple(newtime, element));
_condition_variable.notify_one();
}
// Blocks until getting the first-element or throw EventLoopNoElements if it is shutting down
// Throws EventLoopNoElements when it is shutting down and there are not more elements
Type dequeue() {
typename EventLoopQueue::iterator queuebegin;
typename EventLoopQueue::iterator queueend;
std::chrono::time_point<std::chrono::system_clock> sleeptime;
// _mutex prevents multiple consumers from getting the same item or from missing the wake up
std::unique_lock<std::mutex> dequeuelock(_mutex);
do {
queuebegin = _queue.begin();
queueend = _queue.end();
if ( queuebegin == queueend ) {
if ( _shutdown ) {
throw EventLoopNoElements( "There are no more elements on the queue because it already shutdown." );
}
_condition_variable.wait( dequeuelock );
}
else {
if ( _shutdown ) {
if (_free_shutdown) {
break;
}
else {
throw EventLoopNoElements( "The queue is shutting down." );
}
}
std::chrono::time_point<std::chrono::system_clock> timenow = std::chrono::system_clock::now();
sleeptime = std::get<0>( *queuebegin );
if ( sleeptime <= timenow ) {
break;
}
_condition_variable.wait_until( dequeuelock, sleeptime );
}
} while ( true );
Type firstelement = std::get<1>( *queuebegin );
_queue.erase( queuebegin );
dequeuelock.unlock();
return firstelement;
}
};
Utility to print the current timestamp:
std::string getTime() {
char buffer[20];
#if defined( WIN32 )
SYSTEMTIME wlocaltime;
GetLocalTime(&wlocaltime);
::snprintf(buffer, sizeof buffer, "%02d:%02d:%02d.%03d ", wlocaltime.wHour, wlocaltime.wMinute, wlocaltime.wSecond, wlocaltime.wMilliseconds);
#else
std::chrono::time_point< std::chrono::system_clock > now = std::chrono::system_clock::now();
auto duration = now.time_since_epoch();
auto hours = std::chrono::duration_cast< std::chrono::hours >( duration );
duration -= hours;
auto minutes = std::chrono::duration_cast< std::chrono::minutes >( duration );
duration -= minutes;
auto seconds = std::chrono::duration_cast< std::chrono::seconds >( duration );
duration -= seconds;
auto milliseconds = std::chrono::duration_cast< std::chrono::milliseconds >( duration );
duration -= milliseconds;
time_t theTime = time( NULL );
struct tm* aTime = localtime( &theTime );
::snprintf(buffer, sizeof buffer, "%02d:%02d:%02d.%03ld ", aTime->tm_hour, aTime->tm_min, aTime->tm_sec, milliseconds.count());
#endif
return buffer;
}
Example program using these:
// g++ -o test -Wall -Wextra -ggdb -g3 -pthread test.cpp && gdb --args ./test
// valgrind --leak-check=full --show-leak-kinds=all --track-origins=yes --verbose ./test
// procdump -accepteula -ma -e -f "" -x c:\ myexe.exe
int main(int argc, char* argv[]) {
std::cerr << getTime().c_str() << "Creating EventLoop" << std::endl;
EventLoop<std::function<void()>>* eventloop = new EventLoop<std::function<void()>>(true);
std::cerr << getTime().c_str() << "Adding event element" << std::endl;
eventloop->enqueue( 3000, []{ std::cerr << getTime().c_str() << "Running task 3" << std::endl; } );
eventloop->enqueue( 1000, []{ std::cerr << getTime().c_str() << "Running task 1" << std::endl; } );
eventloop->enqueue( 2000, []{ std::cerr << getTime().c_str() << "Running task 2" << std::endl; } );
std::this_thread::sleep_for( std::chrono::milliseconds(5000) );
delete eventloop;
std::cerr << getTime().c_str() << "Exiting after 10 seconds..." << std::endl;
return 0;
}
Output test example:
02:08:28.960 Creating EventLoop
02:08:28.960 Adding event element
02:08:29.960 Running task 1
02:08:30.961 Running task 2
02:08:31.961 Running task 3
02:08:33.961 Exiting after 10 seconds...