Saturday, October 3, 2020

Pinciples of structured parallel programming (part I)

 1. It's easy to run several sequentiap programs in parallel. Problems arise when we try to make that programs to communicate. 

Let one thread 

2. to cope with that problems, following notions were invented:

 - token

- token container

- token stream.

3. Since the main problem is communication, the main building blocks are communucation facilities, or communicators for short.

4. We use only communicators which can be described as storage for tokens. Tokens can be:

 - black/colored (signal with or without message)

- scalar/sream (during the lifetime of a communicator, single or multiple tokens can pe passed)

So, we have 4 communication protocols:

- scalar signal (examples: Thread.join(), CoundownLatch.await())

- scalar message (examples: Future, CompletableFuture)

- stream signal (Semaphore, CoundownLatch.countdown())

- stream of messages (BlockingQueue)

For each protocols, we may have different interface variants:

- synchronous (BlockingQueue, Semaphore, Future, CompletableFuture)

- external asynchronous (CompletableFuture)

- internal asynchronous (not explicitly present in standaed Java library). This is the most interesting part of the current work.


5. The main communication problem is synchronization of fast comsumer with slow producer:

- from consumer side, detect if token is not ready, and block consumer's execution

- from producer side, notify consumers of new token arrival so that the can resume their execution

The classic approach is to use synchronized/wait/notify.

6. Traditionaly, synchronous interfaces are most developed, and has communicators for almost all protocols, except for scalar signal. For scalar signal, we can model it with CountdownLatch(1).

But CountdownLatch is not a basic communicator: it has different protocols for input and output interfaces. We went to make such complex communicators out of simple building blocks. So first we create ScalarSignal:

public class ScalarSignal implements ScalarSignalIn, ScalarSygnalSyncOut {

      private boolean completed = false;

      public synchronized void onComplete() {

           completed=true;

           notifyAll();

     } 

      public synchronized void await() {

           while (!completed) {
              wait();
           }
     } 

}


now we can model CountDownLatch with primitive building blocks:


public class CountDownLatchModel  implements StreamSignalIn, ScalarSygnalSyncOut {

    private Semaphore inp;

    private ScalarSignal out = new ScalarSignal();


   public  CountDownLatchModel (int count) {

     inp=new Semaphore(1-count);

         new Thread(this::runAction).start();

  }

  void runAction() {

         inp.aquire();

        out.onComplete();

  }

  public void countdown() {

        inp.release();

  }

  public void await() {

     out.await();

 }

}

This model works, but consumes a lot of memory for the thread stack. As a result, such communicators are not constructed this way. Instead, error-prone low-level synchronized/wait/notify approach is used.

The way out is to use actors instead of threads:


public class CountDownLatchActor StreamSignalIn, ScalarSygnalSyncOut extends AsyncProc {

    private AsyncSemaphore inp;

    private ScalarSignal out = new ScalarSignal();


   public CountDownLatchActor (int count) {

     inp=new AsyncSemaphore (1-count);

  }


  void runAction() {

        out.onComplete();

  }

  public void countdown() {

        inp.release();

  }

  public void await() {

     out.await();

 }

}

The differences are:

- AsyncSemaphore instead of Semaphore 
- no thread is started, the method runAction is invoked automatically when AsyncSemaphore is unblocked
- no need to invoke inp.aquire, because the semaphore will never be used again

So what is that magic AsyncSemaphore, which allowed us to get rid of thread stack?
It is a communicator which implements internal asynchronous interface in its output side.

(to be continued).

Thursday, July 23, 2020

Anatomy of asynchronous tasks

Anatomy of asynchronous interaction

When a parallel thread needs more data, it calls an interface function, which returns the data as a result. If data is not ready, the function blocks the caller's stack. Asynchronous tasks may not block, so the communication protocol is different: the task calls the request procedure with callback parameter. This request procedure exits immediately, and the calling task also exits, leaving the worker thread. Then, when the data is ready (it could be in the same moment), the callback procedure is called with data as a parameter. This call resubmits the task to a thread pool, if there are no other blockers. For example, if the task is running, processing previous portion of data, submitting it again may be dangerous and should be prevented. Such tasks with sequential processing of incoming data are called actors and  first was proposed by Carl Hewitt.

Details of the interaction protocol can be different. One important feature is the number of callback invocations for single request invocation. There are 3 variants:
 - single callback call - this is how tasks generated by CompletableFuture factory methods  work. They get single data item, process it, compute the result and never return to execution.
 - unlimited callback calls - Hewitt's actors. For each incoming data item. a user-defined procedure is invoked.
 - Reactive streams protocol  -  request procedure is split in 2: Publisher.subscribe and Subscription.request. Subscribe only informs caller that a new client wants to receive data. Request  procedure has a parameter - number of tokens which client is able to accept. This procedure can be called multiple times, after client finishes to process next portion of data.


Taxonomy of actor's ports

Another protocol attribute is whether the protocol allows completion signals. Completion signals an be of 2 kinds: normal and exceptional. CompletableFuture protocol supports only exceptional signal, there is no need for normal completion, as all data interactions pass only single data token. Akka actors does not support either signals. Reactive streams support both.

Yet another feature is the colour of tokens. Black tokens are indistinguishable and only number of tokens should be passed. Colour tokens are normal data messages. All 3 above mentioned protocols support colour tokens. Black tokens are supported by the request procedure of the reactive protocol. But in synchronous parallel programming, black tokens were used long ago, with the help of Dijkstra's semaphores.

Functioning of actor's body

Actor's functionality can be described in Petri Net terms as follows: actor consists of 2 owns places and a transition. Control place is internal and is not accessible outside the actors internals. It can contain at most 1 black token. Data place can contain unlimited number of data tokens, send by connected data providers. When both places are not empty, the actor is fired, with tokens extracted from the places and saved in the actor's body. Firing is implemented as submitting to a thread pool. When actor gets to a processor, the user-defined procedure with single parameter - data token - is called. The processing of control token is hidden from the user. When the user-defined procedure returns, the control token is returned to its place, which may cause another submission to a thread pool, if more data tokens exist.


Actor's toolkit 

Nothing prevents us to construct an actor with multiple token places (which are usually called ports) of various types. And in fact, output connection for reactive Publisher consists of 2 ports: one coloured port for reference to subscribers and another black port  for request counter. But all implementations I looked at do not provide access to port classes and so do not allow to create actors with arbitrary port configuration. So I developed generic actor library DF4J which has base classes for various actors and ports. Ports can are freely combined in a single actor. Actors can be aggregated in arbitrary dataflow graph with unified error handling. See https://github.com/akaigoro/df4j.


Wednesday, August 29, 2018

The essence of asynchronous programming

Asynchronous programming is a kind of parallel programming, where the fundamental unit of computation is a task and not a thread.

A typical thread can be in 2 states: blocked and active. It is blocked while waiting access to a shared resource, or waiting information which is going to arrive through input/output devices. It is active when its code is executing on a processor. In the both states, the thread keeps significant amount of memory (typically, 0.5-1 megabyte) for procedure stack. This restricts maximum number of threads which can exist simultaneously on a single computer.

Similarly, an asynchronous task can be in blocked or active state, for the same reasons. But in the blocked state, the task is not bound to a procedure stack. A stack is assigned only when the task is active and is running on a processor. Since the number of processors in contemporary computers is small (less than 100), memory consumption for stacks become insignificant. The memory for the task itself is usually much less than 1 megabyte, so the maximum number of task is orders of magnitude more than maximum number of threads. This allows to write massively parallel algorithms in more natural way than when using threads.

The downside of using tasks vs threads is that the program code become more complicated. Partially, this can be mitigated using async/await syntax constructs, found in such languages as C#, Dart, Ecmascript-262, and Kotlin.

Another difference is that the constructs, used to exchange information between tasks, have different interface and implementation than the constructs for interaction between threads. But for each kind of inter-thread communication, we can implement an analogue inter-task facility. This has significant consequence: a parallel algorithm can be first develop in abstract form, and only at the  implementation stage programmer can chose how to implement this or that activity: as a thread or as an asynchronous task. The final program can contain both threads and tasks, and they can freely interact with each other.