Saturday, October 3, 2020

Pinciples of structured parallel programming (part I)

 1. It's easy to run several sequentiap programs in parallel. Problems arise when we try to make that programs to communicate. 

Let one thread 

2. to cope with that problems, following notions were invented:

 - token

- token container

- token stream.

3. Since the main problem is communication, the main building blocks are communucation facilities, or communicators for short.

4. We use only communicators which can be described as storage for tokens. Tokens can be:

 - black/colored (signal with or without message)

- scalar/sream (during the lifetime of a communicator, single or multiple tokens can pe passed)

So, we have 4 communication protocols:

- scalar signal (examples: Thread.join(), CoundownLatch.await())

- scalar message (examples: Future, CompletableFuture)

- stream signal (Semaphore, CoundownLatch.countdown())

- stream of messages (BlockingQueue)

For each protocols, we may have different interface variants:

- synchronous (BlockingQueue, Semaphore, Future, CompletableFuture)

- external asynchronous (CompletableFuture)

- internal asynchronous (not explicitly present in standaed Java library). This is the most interesting part of the current work.


5. The main communication problem is synchronization of fast comsumer with slow producer:

- from consumer side, detect if token is not ready, and block consumer's execution

- from producer side, notify consumers of new token arrival so that the can resume their execution

The classic approach is to use synchronized/wait/notify.

6. Traditionaly, synchronous interfaces are most developed, and has communicators for almost all protocols, except for scalar signal. For scalar signal, we can model it with CountdownLatch(1).

But CountdownLatch is not a basic communicator: it has different protocols for input and output interfaces. We went to make such complex communicators out of simple building blocks. So first we create ScalarSignal:

public class ScalarSignal implements ScalarSignalIn, ScalarSygnalSyncOut {

      private boolean completed = false;

      public synchronized void onComplete() {

           completed=true;

           notifyAll();

     } 

      public synchronized void await() {

           while (!completed) {
              wait();
           }
     } 

}


now we can model CountDownLatch with primitive building blocks:


public class CountDownLatchModel  implements StreamSignalIn, ScalarSygnalSyncOut {

    private Semaphore inp;

    private ScalarSignal out = new ScalarSignal();


   public  CountDownLatchModel (int count) {

     inp=new Semaphore(1-count);

         new Thread(this::runAction).start();

  }

  void runAction() {

         inp.aquire();

        out.onComplete();

  }

  public void countdown() {

        inp.release();

  }

  public void await() {

     out.await();

 }

}

This model works, but consumes a lot of memory for the thread stack. As a result, such communicators are not constructed this way. Instead, error-prone low-level synchronized/wait/notify approach is used.

The way out is to use actors instead of threads:


public class CountDownLatchActor StreamSignalIn, ScalarSygnalSyncOut extends AsyncProc {

    private AsyncSemaphore inp;

    private ScalarSignal out = new ScalarSignal();


   public CountDownLatchActor (int count) {

     inp=new AsyncSemaphore (1-count);

  }


  void runAction() {

        out.onComplete();

  }

  public void countdown() {

        inp.release();

  }

  public void await() {

     out.await();

 }

}

The differences are:

- AsyncSemaphore instead of Semaphore 
- no thread is started, the method runAction is invoked automatically when AsyncSemaphore is unblocked
- no need to invoke inp.aquire, because the semaphore will never be used again

So what is that magic AsyncSemaphore, which allowed us to get rid of thread stack?
It is a communicator which implements internal asynchronous interface in its output side.

(to be continued).

Thursday, July 23, 2020

Anatomy of asynchronous tasks

Anatomy of asynchronous interaction

When a parallel thread needs more data, it calls an interface function, which returns the data as a result. If data is not ready, the function blocks the caller's stack. Asynchronous tasks may not block, so the communication protocol is different: the task calls the request procedure with callback parameter. This request procedure exits immediately, and the calling task also exits, leaving the worker thread. Then, when the data is ready (it could be in the same moment), the callback procedure is called with data as a parameter. This call resubmits the task to a thread pool, if there are no other blockers. For example, if the task is running, processing previous portion of data, submitting it again may be dangerous and should be prevented. Such tasks with sequential processing of incoming data are called actors and  first was proposed by Carl Hewitt.

Details of the interaction protocol can be different. One important feature is the number of callback invocations for single request invocation. There are 3 variants:
 - single callback call - this is how tasks generated by CompletableFuture factory methods  work. They get single data item, process it, compute the result and never return to execution.
 - unlimited callback calls - Hewitt's actors. For each incoming data item. a user-defined procedure is invoked.
 - Reactive streams protocol  -  request procedure is split in 2: Publisher.subscribe and Subscription.request. Subscribe only informs caller that a new client wants to receive data. Request  procedure has a parameter - number of tokens which client is able to accept. This procedure can be called multiple times, after client finishes to process next portion of data.


Taxonomy of actor's ports

Another protocol attribute is whether the protocol allows completion signals. Completion signals an be of 2 kinds: normal and exceptional. CompletableFuture protocol supports only exceptional signal, there is no need for normal completion, as all data interactions pass only single data token. Akka actors does not support either signals. Reactive streams support both.

Yet another feature is the colour of tokens. Black tokens are indistinguishable and only number of tokens should be passed. Colour tokens are normal data messages. All 3 above mentioned protocols support colour tokens. Black tokens are supported by the request procedure of the reactive protocol. But in synchronous parallel programming, black tokens were used long ago, with the help of Dijkstra's semaphores.

Functioning of actor's body

Actor's functionality can be described in Petri Net terms as follows: actor consists of 2 owns places and a transition. Control place is internal and is not accessible outside the actors internals. It can contain at most 1 black token. Data place can contain unlimited number of data tokens, send by connected data providers. When both places are not empty, the actor is fired, with tokens extracted from the places and saved in the actor's body. Firing is implemented as submitting to a thread pool. When actor gets to a processor, the user-defined procedure with single parameter - data token - is called. The processing of control token is hidden from the user. When the user-defined procedure returns, the control token is returned to its place, which may cause another submission to a thread pool, if more data tokens exist.


Actor's toolkit 

Nothing prevents us to construct an actor with multiple token places (which are usually called ports) of various types. And in fact, output connection for reactive Publisher consists of 2 ports: one coloured port for reference to subscribers and another black port  for request counter. But all implementations I looked at do not provide access to port classes and so do not allow to create actors with arbitrary port configuration. So I developed generic actor library DF4J which has base classes for various actors and ports. Ports can are freely combined in a single actor. Actors can be aggregated in arbitrary dataflow graph with unified error handling. See https://github.com/akaigoro/df4j.