I finally have a Fan script which parallelizes nicely on the Sun Fire T2000. This version of the program runs on the full 42GB dataset a bit under 18min and weighs in with 114 LOC. If you stripped comments and wrote your opening braces differently it is only 91 LOC.
real 17:48.4
user 3:27:37.4
sys 1:37:00.0
I'm sure there will be lots of speedier results, but hopefully this will turn out to be respectable performance. But my real goal is a simple, concise program. I have a program which runs a tad faster (best run 16:57), but if this was production code I'd go with this version which is easier to understand and maintain.
What Doesn't Work
First a couple tips to what doesn't work. I'm running Fan on the JVM, so under the covers we are using Java's IO. A simple Fan program can read the bytes of the 42GB data in less than 5 minutes - so we aren't IO bound. The killer is converting from bytes to Strings, but that is unavoidable (the price of a Unicode enabled VM).
So the challenge is how to parallelize decoding bytes into Strings we can parse. I tried a couple attempts to divide the file up and give each thread a region of the file to read and parse. But having multiple threads with open file handles did not utilize the cores well.
Another design I tried out was funneling all the parsed lines to a single totalizer thread who was responsible for managing the mapped counters. That is clean and simple because we don't have a merge step. But after I added some diagnostics, I saw that it was spending a lot of time in a wait step due to flow control on the totalizer's message queue. I was able to achieve good performance by sending batches of 1000s to the totalizer, but it resulted in a more complicated program - so I ditched that strategy. So this version does the totalizing in the parsing threads with an extra merge step.
Design
All thread concurrency is handled with message passing using Thread.sendAysnc. The basic flow of the program:
spawn reader thread to read in 64kb chunks of the file
spawn n parser threads to parse those 64kb chunks in parallel and totalize the results
wait for the reader to finish
send null to the parsers threads to get their individual shard of results
spawn a thread to merge the shards and report on each field (hits, 404s, etc)
Reader
The reader thread is the responsible for reading the file as fast as possible. We loop through using a random access file reading in 64kb Bufs. Once we read in 64kb, we trim the buffer backwards to the last newline, and reposition the file pointer to re-read those bytes. Remember we only want this thread to read bytes, it doesn't do any character decoding into strings.
As we read buffers, ideally we'd just like to send them as async messages to the parser threads on a rotating basis. The problem is that we can quickly fill up our message queues with huge chunks of memory - so we need some flow control. The way I implemented flow control was to have the parser's send themselves to the reader's message queue when they are ready for more work. The reader waits for the next parser in its loop, reads the next chunk, and then sends it as an async message back to the parser. With this design we don't ever enqueue more than one 64kb chunk on each parser's message queue.
Parsers
The parsers are where we utilize the cores during the IO/totalization phase. Each parser thread is responsible for:
waiting for 64kb chunks from the reader
parsing those chunks into log records
totalizing the records
sending itself back to the reader as an async message for more work
The totals for a single parser thread is called a Shard. A shard contains the maps for the various log fields (hits, bytes, clients, etc). When the main thread joins with the reader thread, it sends null to each parser to get back its shard results.
I'm not sure what the optimal number of parsers is, but using 30 parsers was able to parse the 218 millions lines in 16min 43sec. This phase is by far where most of the runtime is spent. A lot of is spent mapping bytes to java.lang.Strings. This is understandable since we have to copy from byte[] into char[] under the covers using a charset. Maybe something in NIO can read from file directly into char[] under the covers - but I didn't investigate.
Report
Once we've gotten all the shards, we spawn a report thread for each field: hits, bytes, clients, 404s, and refs. The report thread's responsibilities:
merge shards into one big Str:Int map
reverse sort the map values to find the threshold (10th highest value)
find all the key/value pairs with a value higher than threshold
write the report to a string
return the report string to the main thread to print
You could definitely do some clever things here to parallelize the merge/sort better. But this phase only consumes 51sec of the total run, so I stuck with a simple, brute force design.
I was curious about this one. I wondered why Fan lost about 4 seconds on Java. Unfortunately it's not obvious as the sample Java program uses some kind of a complex framework (how significant!).
Since there are some test data available I just wanted to run the Fan sample. Unfortunately it doesn't work anymore. If I update the syntax and Str.split API change I still get an exception:
Brian, is this temporary due to recent changes towards value-type support or something has changed elsewhere?
brianWed 15 Oct 2008
It is hard to compare my Fan solution to other Java based solutions b/c the code is so different - I was going for a small number of lines versus speed. But I suspect even with same code structure Fan will be slower because its everything is boxed (something we will rectify soon :-). Java as a whole fared poorly with its Unicode strings versus byte oriented solutions - a huge amount of time is spend decoding bytes into Unicode and performing Unicode aware regex.
The specific issue you are seeing was because Fan needs some type of escape hatch to allow unsafe mutable message passing. See Issues from WideFinder for a more general discussion. As a temporary fix you can comment out the calls to Namespace.safe in "Thread.java".
brian Mon 9 Jun 2008
Background
I finally have a Fan script which parallelizes nicely on the Sun Fire T2000. This version of the program runs on the full 42GB dataset a bit under 18min and weighs in with 114 LOC. If you stripped comments and wrote your opening braces differently it is only 91 LOC.
I'm sure there will be lots of speedier results, but hopefully this will turn out to be respectable performance. But my real goal is a simple, concise program. I have a program which runs a tad faster (best run 16:57), but if this was production code I'd go with this version which is easier to understand and maintain.
What Doesn't Work
First a couple tips to what doesn't work. I'm running Fan on the JVM, so under the covers we are using Java's IO. A simple Fan program can read the bytes of the 42GB data in less than 5 minutes - so we aren't IO bound. The killer is converting from bytes to Strings, but that is unavoidable (the price of a Unicode enabled VM).
So the challenge is how to parallelize decoding bytes into Strings we can parse. I tried a couple attempts to divide the file up and give each thread a region of the file to read and parse. But having multiple threads with open file handles did not utilize the cores well.
Another design I tried out was funneling all the parsed lines to a single totalizer thread who was responsible for managing the mapped counters. That is clean and simple because we don't have a merge step. But after I added some diagnostics, I saw that it was spending a lot of time in a wait step due to flow control on the totalizer's message queue. I was able to achieve good performance by sending batches of 1000s to the totalizer, but it resulted in a more complicated program - so I ditched that strategy. So this version does the totalizing in the parsing threads with an extra merge step.
Design
All thread concurrency is handled with message passing using
Thread.sendAysnc
. The basic flow of the program:Reader
The reader thread is the responsible for reading the file as fast as possible. We loop through using a random access file reading in 64kb
Bufs
. Once we read in 64kb, we trim the buffer backwards to the last newline, and reposition the file pointer to re-read those bytes. Remember we only want this thread to read bytes, it doesn't do any character decoding into strings.As we read buffers, ideally we'd just like to send them as async messages to the parser threads on a rotating basis. The problem is that we can quickly fill up our message queues with huge chunks of memory - so we need some flow control. The way I implemented flow control was to have the parser's send themselves to the reader's message queue when they are ready for more work. The reader waits for the next parser in its loop, reads the next chunk, and then sends it as an async message back to the parser. With this design we don't ever enqueue more than one 64kb chunk on each parser's message queue.
Parsers
The parsers are where we utilize the cores during the IO/totalization phase. Each parser thread is responsible for:
The totals for a single parser thread is called a
Shard
. A shard contains the maps for the various log fields (hits, bytes, clients, etc). When the main thread joins with the reader thread, it sends null to each parser to get back its shard results.I'm not sure what the optimal number of parsers is, but using 30 parsers was able to parse the 218 millions lines in 16min 43sec. This phase is by far where most of the runtime is spent. A lot of is spent mapping bytes to java.lang.Strings. This is understandable since we have to copy from
byte[]
intochar[]
under the covers using a charset. Maybe something in NIO can read from file directly intochar[]
under the covers - but I didn't investigate.Report
Once we've gotten all the shards, we spawn a report thread for each field: hits, bytes, clients, 404s, and refs. The report thread's responsibilities:
Str:Int
mapYou could definitely do some clever things here to parallelize the merge/sort better. But this phase only consumes 51sec of the total run, so I stuck with a simple, brute force design.
Code
katox Wed 15 Oct 2008
I was curious about this one. I wondered why Fan lost about 4 seconds on Java. Unfortunately it's not obvious as the sample Java program uses some kind of a complex framework (how significant!).
Since there are some test data available I just wanted to run the Fan sample. Unfortunately it doesn't work anymore. If I update the syntax and
Str.split
API change I still get an exception:Brian, is this temporary due to recent changes towards value-type support or something has changed elsewhere?
brian Wed 15 Oct 2008
It is hard to compare my Fan solution to other Java based solutions b/c the code is so different - I was going for a small number of lines versus speed. But I suspect even with same code structure Fan will be slower because its everything is boxed (something we will rectify soon :-). Java as a whole fared poorly with its Unicode strings versus byte oriented solutions - a huge amount of time is spend decoding bytes into Unicode and performing Unicode aware regex.
The specific issue you are seeing was because Fan needs some type of escape hatch to allow unsafe mutable message passing. See Issues from WideFinder for a more general discussion. As a temporary fix you can comment out the calls to
Namespace.safe
in "Thread.java".