#2586 Threaded Streams

SlimerDude Tue 27 Dec 2016

The classic example of using Streams is to have a producer in one thread that puts bytes onto a queue, and a consumer on different thread that reads from it.

.   Thread 1
--> Producer
                 --> Consumer
                     Thread 2  

To accomplish this, Buf immediately comes to mind as it is a buffer complete with an InStream and an OutStream. But it is also (obviously) mutable.

A quick question then, is Buf safe to use in muti-threaded environments?

If not (as I assume), then is there a standard idiomatic Fantom way of achieving this, or would I need to roll my own?

brian Tue 27 Dec 2016

Earlier this year I added the ability for Buf to be converted to an immutable structure. Its extremely efficient because it creates a new immutable ConstBuf instance with a reference to the original backing byte[] array (and old immutable buf looses its reference to it)

fansh> buf := Buf().print("hello world")
MemBuf(pos=11 size=11)
fansh> constBuf := buf.toImmutable
ConstBuf(pos=0 size=11)
fansh> buf.size 
fansh> constBuf.size
fansh> constBuf.in.readAllStr
hello world

SlimerDude Tue 27 Dec 2016

Hi Brian,

The immutable Buf is nice but it just allows me to pass chunks of data between threads, similar to passing other types of messages.

But what I'm after is the ability for the 1st thread to continually produce data, and the 2nd thread to continually consume the same data. That way I'm constantly streaming data from one thread to another.

In effect I'd like to know if it's possible to do something similar to the Producer Consumer Problem but without the constraint of a fixed buffer size.

SlimerDude Wed 28 Dec 2016

My conundrum is that I want both the Consumer and Producer threads to be continually running in a loop, and not just doing a bit of work whenever a message comes in.

The solution is to have a 3rd thread / Actor that collates all the data, which both threads call on.

.--->--->---                   --->--->---
| Producer  |      Data       | Consumer  |
▲  Thread   ▼ ---> Queue <--> ▲  Thread   ▼
|   Loop    |      Actor      |   Loop    |
 ---<---<---                   ---<---<--- 

When the producer has data, it can message it to the Data Queue Actor in the usual way, which appends the data on a local Buf. The consumer thread, when it wants to check for data, simply messages the Data Queue Actor which returns what it has (via Future.get()).

An immutable Buf can be used to carry binary data to and from the Data Queue Actor.

When I next get time, I may wrap up the above in some handy Stream implementations and override the read / write methods so it all seems a little more streamy and not so much like discrete data passing.

brian Mon 2 Jan

Couple thoughts I have:

  • be careful with multiple actors that you don't get into a situation processing huge files that you build up a huge amount of buffered bytes in memory; you might need some flow control in there
  • might want to take a look at this ancient contest program
  • I usually find that processing disk I/O is bottlenecked by I/O speed so doesn't make sense to split processing into different thread/actor

Login or Signup to reply.