Direct communication between Javascript in Jupyter and server via IPython kernel

Solution 1:

OK so I found a solution for now but it is not great. Indeed of just waiting for a reply and keep busy the main loop, I added a timeout and interleave it with do_one_iteration of the kernel to force to handle to messages:

while True:
    try:
        rep = zmq_socket.recv(flags=zmq.NOBLOCK).decode("utf-8")
    except zmq.error.ZMQError:
        kernel.do_one_iteration()

It works but unfortunately it is not really portable and it messes up with the Jupyter evaluation stack (all queued evaluations will be processed here instead of in order)...

Alternatively, there is another way that is more appealing:

import zmq
import asyncio
import nest_asyncio

nest_asyncio.apply()

zmq_socket.send(b"ready")
async def enforce_receive():
    await kernel.process_one(True)
    return zmq_socket.recv().decode("utf-8")
loop = asyncio.get_event_loop()
rep = loop.run_until_complete(enforce_receive())

but in this case you need to know in advance that you expect the kernel to receive exactly one message, and relying on nest_asyncio is not ideal either.

Here is a link to an issue on this topic of Github, along with an example notebook.

UPDATE

I finally manage to solve completely my issue, without shortcomings. The trick is to analyze every incoming messages. The irrelevant messages are put back in the queue in order, while the comm-related ones are processed on-the-spot:

class CommProcessor:
    """
    @brief     Re-implementation of ipykernel.kernelbase.do_one_iteration
                to only handle comm messages on the spot, and put back in
                the stack the other ones.

    @details   Calling 'do_one_iteration' messes up with kernel
                'msg_queue'. Some messages will be processed too soon,
                which is likely to corrupt the kernel state. This method
                only processes comm messages to avoid such side effects.
    """

    def __init__(self):
        self.__kernel = get_ipython().kernel
        self.qsize_old = 0

    def __call__(self, unsafe=False):
        """
        @brief      Check once if there is pending comm related event in
                    the shell stream message priority queue.

        @param[in]  unsafe     Whether or not to assume check if the number
                                of pending message has changed is enough. It
                                makes the evaluation much faster but flawed.
        """
        # Flush every IN messages on shell_stream only
        # Note that it is a faster implementation of ZMQStream.flush
        # to only handle incoming messages. It reduces the computation
        # time from about 10us to 20ns.
        # https://github.com/zeromq/pyzmq/blob/e424f83ceb0856204c96b1abac93a1cfe205df4a/zmq/eventloop/zmqstream.py#L313
        shell_stream = self.__kernel.shell_streams[0]
        shell_stream.poller.register(shell_stream.socket, zmq.POLLIN)
        events = shell_stream.poller.poll(0)
        while events:
            _, event = events[0]
            if event:
                shell_stream._handle_recv()
                shell_stream.poller.register(
                    shell_stream.socket, zmq.POLLIN)
                events = shell_stream.poller.poll(0)

        qsize = self.__kernel.msg_queue.qsize()
        if unsafe and qsize == self.qsize_old:
            # The number of queued messages in the queue has not changed
            # since it last time it has been checked. Assuming those
            # messages are the same has before and returning earlier.
            return

        # One must go through all the messages to keep them in order
        for _ in range(qsize):
            priority, t, dispatch, args = \
                self.__kernel.msg_queue.get_nowait()
            if priority <= SHELL_PRIORITY:
                _, msg = self.__kernel.session.feed_identities(
                    args[-1], copy=False)
                msg = self.__kernel.session.deserialize(
                    msg, content=False, copy=False)
            else:
                # Do not spend time analyzing already rejected message
                msg = None
            if msg is None or not 'comm_' in msg['header']['msg_type']:
                # The message is not related to comm, so putting it back in
                # the queue after lowering its priority so that it is send
                # at the "end of the queue", ie just at the right place:
                # after the next unchecked messages, after the other
                # messages already put back in the queue, but before the
                # next one to go the same way. Note that every shell
                # messages have SHELL_PRIORITY by default.
                self.__kernel.msg_queue.put_nowait(
                    (SHELL_PRIORITY + 1, t, dispatch, args))
            else:
                # Comm message. Processing it right now.
                comm_handler = getattr(
                    self.__kernel.comm_manager, msg['header']['msg_type'])
                msg['content'] = self.__kernel.session.unpack(msg['content'])
                comm_handler(None, None, msg)

        self.qsize_old = self.__kernel.msg_queue.qsize()

process_kernel_comm = CommProcessor()