2015-03-24

Streaming Logs with Transducers and Ramda

Ramda is a library that helps you write JavaScript without curly braces. In this article we will use Ramda to parse a log file without curly braces (and introduce transducers along the way).

Say we are interested in preparing a list of pages visited by IP address, filtered from a log file that includes extra cruft that we don't care about.

The log file has the following format:

127.0.0.1 - - [26/Feb/2015 19:25:25] "GET /static/r.js HTTP/1.1" 304 -
127.0.0.5 - - [26/Feb/2015 19:27:35] "GET /blog/ HTTP/1.1" 200 -
127.0.0.1 - - [28/Feb/2015 16:44:03] "GET / HTTP/1.1" 200 -
127.0.0.1 - - [28/Feb/2015 16:44:03] "POST / HTTP/1.1" 200 -

We want to skip all requests to static resources and print only the IP address and the URL visited.

127.0.0.5 visited http://simplectic.com/blog/
127.0.0.1 visited http://simplectic.com/

We'll get started by defining a few helper functions.

Utils

Let's begin by defining a function that tests whether a line in the log file is a request for a page.

var R = require('ramda')

var isGet = R.test(/GET \//)
var notStatic = R.complement(R.test(/GET \/static/))
var isPage = R.allPass([isGet, notStatic])

R.filter(isPage, [
  '127.0.0.1 - - [26/Feb/2015 19:25:25] "GET /static/r.js HTTP/1.1"',
  '127.0.0.5 - - [26/Feb/2015 19:27:35] "GET /blog/ HTTP/1.1" 200 -',
  '127.0.0.1 - - [28/Feb/2015 16:44:03] "GET / HTTP/1.1" 200 -',
  '127.0.0.1 - - [28/Feb/2015 16:44:03] "POST / HTTP/1.1" 200 -'])
// => [ '127.0.0.5 - - [26/Feb/2015 19:27:35] "GET /blog/ HTTP/1.1" 200 -',
//      '127.0.0.1 - - [28/Feb/2015 16:44:03] "GET / HTTP/1.1" 200 -' ]

We first define a predicate that matches any GET / request using R.test. We then use R.complement with a test for GET /static to remove all requests to static resources. Finally we combine the predicates using R.allPass.

Since we only care about the IP address and URL, let's extract those fields using R.match.

var splitLine = R.pipe(
      R.match(/^(\S+).+"([^"]+)"/),
      R.tail),

R.map(splitLine, [
  '127.0.0.5 - - [26/Feb/2015 19:27:35] "GET /blog/ HTTP/1.1" 200 -',
  '127.0.0.1 - - [28/Feb/2015 16:44:03] "GET / HTTP/1.1" 200 -'])
// => [ [ '127.0.0.5', 'GET /blog/ HTTP/1.1' ],
//      [ '127.0.0.1', 'GET / HTTP/1.1' ] ]

This function uses R.pipe to run two functions in sequence. First, we use a regular expression to extract the IP address and the quoted request. The result is passed through R.tail to end up with an array of the two matches (the first item is the full matched string).

We also want to convert the request to a URL.

var toURL = R.pipe(
      R.split(' '),
      R.slice(1, 2),
      R.prepend('http://simplectic.com'),
      R.join(''))

R.map(toURL, [
  'GET /blog/ HTTP/1.1',
  'GET / HTTP/1.1'])
// => ['http://simplectic.com/blog/',
//     'http://simplectic.com/' ]

We again use R.pipe to compose a sequence of functions. We first split the string on spaces and extract the path at index 1 using R.slice. We then prepend the domain to the path and join the array back to a string.

This function works on strings, but R.splitLine returns a list of pairs. We want to transform the second item while leaving the first intact. One way to do this is to define a lens that operates on the second item.

// var valueLens = R.lensIndex(1)
var valueLens = R.lens(
      // (entry) => entry[1]
      R.last,
      // (value, entry) => [entry[0], value]
      R.flip(R.useWith(Array, R.head)))

The first function used to create the lens is the "getter" and uses R.last to grab the second item in the pair. The second function is a "setter" that accepts a new value and the original pair. The function returns a new pair with the original first value and the new second value in the pair. This is a reusable function that will operate on the value of any pair.

This pattern is so useful that Ramda will include R.lensIndex which could be used directly here.

Now that we have a reusable lens on the value of the pair, we can map the value using toURL defined above.

var valueToUrl = valueLens.map(toURL)

R.map(valueToUrl, [
  [ '127.0.0.5', 'GET /blog/ HTTP/1.1' ],
  [ '127.0.0.1', 'GET / HTTP/1.1' ] ])
// => [ [ '127.0.0.5', 'http://simplectic.com/blog/' ],
//      [ '127.0.0.1', 'http://simplectic.com/' ] ]

We join the string with "visited" and add back the new line we lost when splitting on lines.

var joinVisited = R.pipe(
    R.join(' visited '),
    R.add(R.__, '\n'))

R.map(joinVisited, [
 [ '127.0.0.5', 'http://simplectic.com/blog/' ],
 [ '127.0.0.1', 'http://simplectic.com/' ] ])
// => [ '127.0.0.5 visited http://simplectic.com/blog/\n',
//      '127.0.0.1 visited http://simplectic.com/\n' ]

Finally, we need to concatenate all strings from this array. We could use R.join, but we will use R.into for fun.

R.into('', R.identity, ['hello\n', 'world\n'])
// => hello
//    world
//

We now have the tools we need to parse the log.

Attempt 1

Let's combine our utility functions to parse the log file using a pipeline.

var fs = require('fs')
var logFile = fs.readFileSync('access.log', {encoding: 'utf8'})

var parseLog = R.pipe(
      R.split('\n'),
      R.filter(isPage),
      R.map(splitLine),
      R.map(valueToUrl),
      R.map(R.join(' visited ')),
      R.map(R.add(R.__, '\n')))

var out = R.into('', R.identity, parseLog(logFile))
console.log(out)

We read the log file as a string and split each line. We filter the pages, extract a pair of IP Address and request, map each request to a URL and join each pair back to a string for each line. Finally we join the array of transformed lines back into a string and print the result.

So what is that second argument to into? It's a transducer, of course! That's cool. But what's a transducer?

Attempt 2

Let's rewrite our log parser to use a transducer.

var lines = require('transduce/string/lines')
var logFile = fs.readFileSync('access.log', {encoding: 'utf8'})

var parseLog = R.compose(
      lines(),
      R.filter(isPage),
      R.map(splitLine),
      R.map(valueToUrl),
      R.map(R.join(' visited ')),
      R.map(R.add(R.__, '\n')))

var out = R.into('', parseLog, [logFile])
console.log(out)

So what's changed? Not much, really. We changed R.pipe to R.compose, used lines instead of R.split and passed in the log file as a single item array.

OK, so what's going on here? Many functions in Ramda, including filter and map can act as a transducer if executed in the right context, such as R.into. The first argument to into is the output, the second defines a transformation to execute, and the third argument is the input source. In this case, we are using a single item array of strings as the input, which is transformed using the transducer created from the composition of other transducers, and concatenated to the output string.

We needed to change from pipe => compose because of the nature of transducers. Although transducers can be composed directly, the execution of the transformation is reversed. This means any time you would use R.pipe for arrays, you would use R.compose for transducers, and vice-versa. Why is this? I'll leave that as an exercise to the reader. (It does make sense when you look at an implementation and squint a little, but I don't want to get side tracked here.)

We also changed the first transformation to use lines from transduce. We did this because R.split does not work as a transducer, and R.into requires an array or an iterable as the input source. Using lines also allow us split across multiple string fragments, which will come in handy later.

This shows that transducers in Ramda can be composed with transducers in other libraries implementing a common protocol, including transducers-js and transducers.js. Want to use a transducer from one of those libraries with Ramda? Go right ahead!

That's neat. But what does this buy us? For one, transformations defined by composing transducers will be executed without creating intermediate aggregates. When we executed the functions directly above, we created arrays after each step. These intermediate arrays were also created when executing the pipeline in our first attempt. By using transducers we can transform each string from the input, as it is iterated, and append the transformed strings directly to the output.

But that's not all...

Attempt 3

Some of you may have started fuming when you saw readFileSync. Everyone knows we should be using a Stream! What were we thinking?

Time for a rewrite.

var lines = require('transduce/string/lines')
var stream = require('transduce-stream')

var parseLog = R.compose(
      lines(),
      R.filter(isPage),
      R.map(splitLine),
      R.map(valueToUrl),
      R.map(R.join(' visited ')),
      R.map(R.add(R.__, '\n')))

process.stdin.pipe(stream(parseLog)).pipe(process.stdout)
process.stdin.resume()

Compare parseLog to the previous attempt. What changed? Anyone? That's right. Nothing.

Remember parseLog is a transducer. This means we can use it in other contexts that work with transducers. One such context is transduce-stream. It's like through2 but with transducers. Other contexts include RxJS, Highland.js, Kefir.js and more. You can use functions that act as transducers (and compositions thereof) from Ramda (and elsewhere) directly with any of these contexts. Nice.

Remember that lines can treat multiple fragments as a single string? The source of the fragments is now stdin and the transformation is streamed directly to stdout.

Let's try it out. If you weren't following along, you can find the full script and example log file on GitHub.

# Term 1
$ tail -f access.log | node pagevisits.js
127.0.0.1 visited http://simplectic.com/blog/
127.0.0.1 visited http://simplectic.com/projects/

# Term 2
$ echo '127.0.0.1 - - [24/Mar/2015 20:16:49] "GET / HTTP/1.1" 200 -'
  >> access.log

# Term 1
127.0.0.1 visited http://simplectic.com/

Take a minute to let that sink in. We are using Ramda functions to follow a log file and view the transformation as it is updated. That's pretty sweet.

Further Study

No really, what's a transducer? You may be interested in:

Want to be notified of new articles? Follow @simplectic on Twitter.