1. It's easy to run several sequentiap programs in parallel. Problems arise when we try to make that programs to communicate.
Let one thread
2. to cope with that problems, following notions were invented:
- token
- token container
- token stream.
3. Since the main problem is communication, the main building blocks are communucation facilities, or communicators for short.
4. We use only communicators which can be described as storage for tokens. Tokens can be:
- black/colored (signal with or without message)
- scalar/sream (during the lifetime of a communicator, single or multiple tokens can pe passed)
So, we have 4 communication protocols:
- scalar signal (examples: Thread.join(), CoundownLatch.await())
- scalar message (examples: Future, CompletableFuture)
- stream signal (Semaphore, CoundownLatch.countdown())
- stream of messages (BlockingQueue)
For each protocols, we may have different interface variants:
- synchronous (BlockingQueue, Semaphore, Future, CompletableFuture)
- external asynchronous (CompletableFuture)
- internal asynchronous (not explicitly present in standaed Java library). This is the most interesting part of the current work.
5. The main communication problem is synchronization of fast comsumer with slow producer:
- from consumer side, detect if token is not ready, and block consumer's execution
- from producer side, notify consumers of new token arrival so that the can resume their execution
The classic approach is to use synchronized/wait/notify.
6. Traditionaly, synchronous interfaces are most developed, and has communicators for almost all protocols, except for scalar signal. For scalar signal, we can model it with CountdownLatch(1).
But CountdownLatch is not a basic communicator: it has different protocols for input and output interfaces. We went to make such complex communicators out of simple building blocks. So first we create ScalarSignal:
public class ScalarSignal implements ScalarSignalIn, ScalarSygnalSyncOut {
private boolean completed = false;
public synchronized void onComplete() {
completed=true;
notifyAll();
}
public synchronized void await() {
}
now we can model CountDownLatch with primitive building blocks:
public class CountDownLatchModel implements StreamSignalIn, ScalarSygnalSyncOut {
private Semaphore inp;
private ScalarSignal out = new ScalarSignal();
public CountDownLatchModel (int count) {
inp=new Semaphore(1-count);
new Thread(this::runAction).start();
}
void runAction() {
inp.aquire();
out.onComplete();
}
public void countdown() {
inp.release();
}
public void await() {
out.await();
}
}
This model works, but consumes a lot of memory for the thread stack. As a result, such communicators are not constructed this way. Instead, error-prone low-level synchronized/wait/notify approach is used.
The way out is to use actors instead of threads:
public class CountDownLatchActor StreamSignalIn, ScalarSygnalSyncOut extends AsyncProc {
private AsyncSemaphore inp;
private ScalarSignal out = new ScalarSignal();
public CountDownLatchActor (int count) {
inp=new AsyncSemaphore (1-count);
}
void runAction() {
out.onComplete();
}
public void countdown() {
inp.release();
}
public void await() {
out.await();
}
}