Monday, March 30, 2009

Commodity Programming

Some musings from the International Lisp Conference 09 ..

From Jao Ortega, noted Lisper and Haskeller, in his Sussmaniana report from ILC 09 ..

"Next day we were in a kind of tongue-in-check debate provocatively entitled Are macros a menace?. Richard Gabriel was on the wrong side, and arguing along the lines that macros were akin to language design and that he’d rather not suffer the consequences of letting your average software engineer undertake such a complex task. Gerry’s intervention at this point made me again nod like as i was mad: if we cannot trust our software enginneers to proficiently use the really powerful tools of our trade, there must be something wrong in the way we educate them; only those able to judiciously use them should get a diploma, to begin with."

From the blogs of Andy Wingo, reporting on ILC 09 .. on the same discussion "Macros: Are they a menace?" ..

"More seriously, the arguments against macros centered on control: control by bosses on workers, with the idea that macros make custom languages, thus making individual programmers less replaceable."

and by this time all of us know why MIT had switched away from Scheme to Python for their introductory programming course, 6.001. This discussion was also kicked off during the interlude to the macro debate when Pascal Costanza asked the reason to Gerry Sussman. Read Andy's post for more details ..

Sigh! Dumbing down of powerful language features in favor of the strawman argument being discussed in Lisp conferences. Fangs of Enterprise software development ?

Monday, March 23, 2009

Learning Haskell - Laziness makes you think differently

Haskell is pervasively lazy, lazy-by-default, unless otherwise forced to be strict. Now this is a big change from where I am coming. Java, well, nothing lazy to write home about. There are some constructs that are lazy by default, but does not support any user defined operation to be lazy. Scala offers a call by name semantics along with a nice syntactic sugar that allows creation of user defined control structures that look like built-in ones. Scala also allows immutable variables to be annotated with "lazy" keyword, which are evaluated on-demand and memoized as well, for subsequent usage.

Haskell is about purity, and once they decided to be lazy, the purity had to be enforced. Of course it has some great consequences ..

Infinite data structures ..

Prelude> take 5 [1..]
[1,2,3,4,5]

or the beautiful fibs to generate all fibonacci numbers ..

Prelude> let fibs = 0 : 1 : zipWith (+) fibs (tail fibs)
Prelude> take 10 fibs
[0,1,1,2,3,5,8,13,21,34]


And infinite data structures enable us to write code that are almost as descriptive as the specification in English ..

primes :: [Int] -> [Int]
primes (n:ns) = n : primes (filter (\v -> v `mod` n /= 0) ns)


Or the beautiful call-by-need semantics to process recursive data structures (aka Tying the knot) efficiently and in a single pass for operations that would otherwise need multiple passes in a purely functional language. I had talked about the same trick in solving the Josephus Flavius game in an earlier post.

In summary, purity of functions and lazy evaluation offer better composition capabilities that include compiler optimizations that help reduce intermediate tree structures.

But purity and lazy evaluation of Haskell has also its share of warts particularly when it comes to a newcomer ..

In languages like Scala or OCaml that offer optional laziness, what I want to be lazy with, is made explicit. And since purity is not enforced by the compiler, I, the programmer, need to ensure that side-effects don't ruin my attempts at deforestation. Nothing gets done as an undercurrent that comes up only as an ugly head in the form of a non linear stack growth during runtime. Otherwise execution is based on a strict evaluation, stack space is deterministic, if you stick to the basics of the best practices like tail recursivity of your functions.

It is not that Haskell's lazy-by-default is worse, but for a new journeyman making his foray into the world of Haskell, this requires a different way of thinking. There is a price of purity that you have to pay, where the programming model may not always map to the performance characteristics of execution. Modularity of code may not result in modularity of the execution model. This may not be bad as well, but it requires quite a bit of experience in Haskell to appreciate this orthogonality. I remember Dan Piponi touch upon this subject in one of the related discussions .. can't find it now ;-(.

Now look at this seemingly efficient tail recursive factorial ..

fact n acc | n < 1     = acc
           | otherwise = fact (n-1) (acc*n)


Lazy evaluation builds up the thunks which do not get evaluated unless actually used - hence making tail recursion an ineffective vehicle for space optimization. You need to use foldl', the stricter version of combinator as ..

fact n = foldl' (*) 1 [1..n]

Have a look at this exercise by Don Stewart, where he tries to solve the memory bumping of the apparently intuitive simple function for calculating the mean of a list of doubles.

mean :: [Double] -> Double
mean xs = sum xs / fromIntegral (length xs)


has to be transformed to

mean :: [Double] -> Double
mean = go 0 0
    where
        go :: Double -> Int -> [Double] -> Double
        go s l []     = s / fromIntegral l
        go s l (x:xs) = go (s+x) (l+1) xs


to ensure the program runs in constant space with lazy lists. The exercise is not trivial to the eye of a beginner, when you have to sidestep your intuition for a more complex alternative.

Many experienced Haskellers also struggle with space optimization. The only thing to keep in mind is that the compiler is your friend. And unless you are careful enough to check out the "core" Haskell that ghc produces as part of its optimizations-by-transformations phase, these thunks can get out of bounds. Space which would otherwise have been reclaimed much earlier in strict evaluation are left allocated in the form of thunks for lazy evaluation.

Friday, March 20, 2009

Now serving - Message Queuing in Web Applications

Looks like asynchronous messaging is becoming quite a common paradigm of distribution in Web applications and large Web sites. The general theory is to do the absolutely essential user requirement stuff as part of the synchronous HTTP request/response cycle. And delegate the rest offline for the asynchronous queuing service to process. This leads to faster page reloads, optimal resource utilization and much better user experience. Gojko Azdic has a great post clarifying the misconceptions around messaging systems, as being only useful for big investment banks.

And here are some real life use cases ..

  • Flickr uses offline queuing systems to process notifications to contacts and third party partners when a user uploads a photo. Informing the user is of prime importance which Flickr does as part of the synchronous request/response processing, while the other notifications are processed by job queues running in parallel.

  • Digg uses Gearman to farm out similar processing of jobs in parallel while serving the absolute essential stuff synchronously.

  • And as recently presented in QCon by Evan Weaver, Twitter is becoming more and more centered around messaging middleware. Every incoming tweet triggers the messaging system for processing notifications to all followers, and this is done asynchronously through Kestrel, an implementation of a message queue written in Scala.


All these use cases lead to the conclusion that message queuing is becoming an increasingly potent medium of distribution even in Web based applications. The moot point is eventual consistency, that leads to a better scalability model. And, by the way, it looks to be about time that open source messaging platforms start getting more steam, so that every house does not have to reinvent the wheel and develop its own message queuing infrastructure.

Sunday, March 15, 2009

Real world event based solutions using MINA and Scala Actors

In the QCon London track, Architectures in Financial Applications, Oracle presented an interesting solution that seeks to transform the reconciliation process from an inherently batch process to an incremental event driven asynchronous one. They presented a solution based on Coherence, their data grid that addresses the capacity challenge by spreading trades across servers and the availability challenge through the resilience of continuous replication that Coherence supports. As part of the presentation, the Coherence based solution touched upon one important paradigm, which can be adopted even outside grid based architectures to improve system performance and throughput.

Asynchronous processing ..


As the Oracle presentation rightly pointed out, one of the recurring problems in today's investment management solutions is the time pressure that the back-office faces dealing with regular overnight batch programs (like reconciliations, confirmations etc.) and struggling to complete them before the next day trading starts. Reconciliation is a typical example which needs to be executed at various levels between entities like traders, brokers, banks and custodians and with varying periodicities. The main bottleneck is these processes are executed as monolithic batch programs that operate on monstrous end-of-day databases containing millions of records that need to be processed, joined, validated and finally aggregated for reconciliation.

Enter Event based Reconciliation ..


This solution is not about grids, it is about poor man's asynchronous event based processing that transforms the end-of-day batch job into incremental MapReduce style progressions that move forward with every event, do real time processing and raise alerts and events for investment managers to respond to and take more informed decisions. One of the main goals of reconciliation being risk mitigation, I suppose such real time progressions will result in better risk handling and management. Also you save on time pressures that mandatory completion of today's batch processes imply, along with a more predictable and uniform load distribution across your array of servers.

And this proposed solution uses commodity tools and frameworks, good old proven techniques of asynchronous event processing, and never claims to be as scalable as the grid based one proposed by Coherence.

Think asynchronous, think events, think abstraction of transport, think selectors, think message queues, think managed thread-pools and thousands of open sockets and connections. After all you need to have a managed infrastructure that will be able to process your jobs incrementally based on events.

Asynchronous IO abstractions, MINA and Scala actors ..


Instead of one batch process for every reconciliation, we can think of handling reconciliation events. For example, loading STREET trades is an event that can trigger reconciliation with HOUSE trades. Or receipt of Trade Confirmations is an event for reconciliation with our placed Orders. We can set this up nicely using generic socket based end-points that listen for events .. and using event dispatch callbacks that do all the necessary processing. MINA provides nice abstractions for registering sockets, managing IoSessions, handling IO events that happen on your registered sockets and provides all the necessary glue to handle protocol encoding and decoding.

Here is a brief thought sequence .. (click to enlarge)



We can use MINA's asynchronous I/O service that abstracts the underlying transport's connection.

// set up the thread pool
executor = Executors.newCachedThreadPool()

// set up the socket acceptor with the thread pool
acceptor = new NioSocketAcceptor(executor, new NioProcessor(executor))

//.. set up other MINA stuff


Using MINA we can decouple protocol encoding / decoding from the I/O, and have separate abstractions for codec construction. Note how the responsibilities are nicely separated between the I/O session handling, protocol filters and event handling.

// add protocol encoder / decoder
acceptor.getFilterChain.addLast("codec", 
  new ProtocolCodecFilter(//.., //..))


Now we need to register an IoHandler, which will handle the events and call the various callbacks like messageReceived(), sessionClosed() etc. Here we would like to be more abstract so that we do not have to handle all complexities of thread and lock management ourselves. We can delegate the event handling to Scala actors, which again can optimize on thread usage and help make the model scale.

// set the IoHandler that delegates event handling to the underlying actor
acceptor.setHandler(
  new IoHandlerActorAdapter(session => new ReconHandler(session, ...)))

// bind and listen
acceptor.bind(new InetSocketAddress(address, port))


So, we have a socket endpoint where clients can push messages which result in MINA events that get routed through the IoHandler implementation, IoHandlerActorAdapter and translated to messages which our Scala actor ReconHandler can react to.

The class IoHandlerActorAdapter is adopted from the naggati DSL for protocol decoding from Twitter ..


class IoHandlerActorAdapter(val actorFactory: (IoSession) => Actor) 
  extends IoHandler {
  //..

  //.. callback
  def messageReceived(session: IoSession, message: AnyRef) = 
    send(session, MinaMessage.MessageReceived(message))

  // send a message to the actor associated with this session
  def send(session: IoSession, message: MinaMessage) = {
    val info = IoHandlerActorAdapter.sessionInfo(session)
    for (actor <- info.actor; if info.filter contains MinaMessage.classOfObj(message)) {
      actor ! message
    }
  }
  //..
}




and the class ReconHandler is the actor that handles reconciliation messages ..

class ReconHandler(val session: IoSession, ...) 
  extends Actor {

  //..
  //..

  def act = {
    loop {
      react {
        case MinaMessage.MessageReceived(msg) =>
          // note we have registered a ProtocolCodecFilter
          // hence we get msg as an instance of specific
          // reconciliation message
          doRecon(msg.asInstanceOf[ReconRequest])

        case MinaMessage.ExceptionCaught(cause) => 
          //.. handle

        case MinaMessage.SessionClosed =>
          //.. handle

        case MinaMessage.SessionIdle(status) =>
          //.. handle
      }
    }
  }

  private def doRecon(request: ReconRequest) = {
    request.header match {
      case "TRADE_CONF_ORDER" => 
        //.. handle reconciliation of confirmation with order

      case "TRADE_STREET_HOME" =>
        //.. handle reconciliation of stree side with home trades

      //..
  }
  //..
}


Note that the above strategy relies on incremental progress. It may not be the case that the entire process gets done upfront. We may have to wait till the closing of the trading hours until we receive the last batch of trades or position information from upstream components. But the difference with the big bang batch process is that, by that time we have progressed quite a bit and possibly have raised some alerts as well, which would not have been possible in the earlier strategy of execution. Another way to view it is as an implementation of MapReduce that gets processed incrementally throughout the day on a real time basis and comes up with the eventual result much earlier than a scheduled batch process.

Wednesday, March 11, 2009

Web programming at a higher level of abstraction

Frameworks like Sproutcore, Cappuccino and Objective-J have given the Web programming model a huge leap. Till date one of the pet peeves of Web programmers had been the fact that the programming model was based on the native DOM and the associated accidental complexities that HTML, Javascript and CSS brought forth. The crux of the problem was not the polyglotism involved in Web development, but all these technologies were often found to be competitive with overlapping responsibilities. Frameworks like Prototype and jQuery took care of cross browser compatibilities, but still you are never sure whether to create a button in HTML or Javascript code or whether to use CSS files or apply styles in Javascript. In a summary, we have so far been trying to use lower level abstractions, which should ideally be abstracted away in higher order frameworks.

And Cappuccino does address this issue head-on. It turns polyglotism on its head and defines a common way to do things on the Web.

I was listening to the interview of the 280 North guys through a post on Ajaxian, where Francisco Tolmasky, the co-founder of 280 North, mentioned in one of the comments ..

"JavaScript does not have any way of writing applications today in the same way mature platforms like Cocoa do. Cappuccino provides you with very high level features like copy-paste (of anything), undo and redo, document management and archiving, vector graphics and animation, etc etc. when you write an application for Mac OS X or iPhone, you can truly focus *just* on your application, not whether it will work on some obscure browser or how to get something as simple as undo working, and that’s precisely what we are trying to provide."

As this Cappuccino blog post clearly indicates, the only purpose behind Objective-J was to provide the right level of abstraction for Cappuccino to be built on top of it ..

"The purpose behind Objective-J was to facilitate the development of Cappuccino, and when we originally set out to do that we simply wanted to add a few key missing features to the existing JavaScript “standard”. In other words, Objective-J is our take on JavaScript 2.0."

To me, the most important takeaway from all these is that you can now develop rich applications on the browser only without having to depend on any proprietary runtime. Google Web Toolkit has been generating compiled highly optimized Javascript since 2006, it will be really interesting to see how the compiler optimized Javascript generated by GWT compares in runtime perfromance with the on-the-fly generated Javascript offered by Cappuccino.

We are seeing more and more impetus on Javascript being touted as a compilation target. Sometime back I read that Gilad Bracha was also thinking in terms of a Newspeak port on V8. With invokeDynamic being blessed by Sun, dynamic language compilation will improve and we will be looking at more and more scripting languages targeting the JVM. And this is the right time that we embrace frameworks that promise higher level abstractions in Javascript, making the browser and the Web a more productive ecosystem.

Monday, March 09, 2009

Bloom Filters - optimizing network bandwidth

Bloom Filter is one of the coolest data structures that qualify as being elegant as well as incredibly useful. I had a short post on a cool application of bloom filters in front ending access to disk based data to achieve improved throughput in query processing. I didn't know Oracle uses bloom filters for processing parallel joins and join filter pruning. In case of parallel joins, the idea is quite simple ..

  1. Each slave process prepares a bloom filter for the join condition that it is processing

  2. It then passes on the bloom filter to the other slave processes, which can apply the filter to its own selected set of records before passing on the final set to the join coordinator


Remember each of the above processes may run in a distributed environment - hence the above technique leads to less data being transported across nodes, thereby saving in network bandwidth for some extra CPU cycles. This paper describes all of these in full details with illustrative examples.

This idea of serializing bloom filters instead of data set has been used quite extensively in load balancing MapReduce operations to minimize intermediate results before sending everything across the network for final aggregation. In case of processing distributed join operations, we may need to compose multiple bloom filters to get the final dataset. Bloom joins, as they are called allow cheap serialization of filters over the wire, by employing some clever techniques like linear hash tables and multi-tier bloom filters, as described in this paper in Comonad Reader.

Bloom joins can also be used effectively in MapReduce processing with CouchDB. The map phase can produce the bloom filters, which can be joined in the reduce phase. In a recent application, I needed to store a very large list mainly for set operations. Instead of storing individual elements, I decided to store a bloom filter that nicely fit in a memcached slab. I could pull it out and do all sorts of bit operations easily and it's blinding fast. Next time you decide to distribute your huge list in Terracotta, think back - there may be a lighter weight option in distributing a bloom filter instead. There are use cases when you will be doing membership checks only and some false positives may not do much harm.

Monday, March 02, 2009

Your data model can speak different languages too

Andrej Koelewijn writes ..

  • REST is about resources. Resource is just another word for object, or record.

  • REST is also about URLs. URLs that identify resources. Just like ids can identify objects or records.

  • And, often overlooked, REST is also about links. Resources use URLs to link to other resources. Just like foreign keys can link records to other records.


and finally concludes ..

REST is a distributed data model

There is no denying the fact that REST is based on the central concept of Resources, which are addressable entities that can be manipulated by all components of the communicating network using a standardized and uniform interface. Resources abstract the underlying data model for the user, based on which the application developer can design nice REST based APIs to the external world. REST abstracts the data model, it is NOT the data model - in fact underlying data representations can change without any impact on the API.

In another related post, talking about the relational data model, he also mentions ..

This is why RDBMSes are so great: it doesn’t bind your data to a single application.

RDBMS does bind your data to the specific application model. What it abstracts from the application, is the underlying physical representation of the data. In fact it is this specific binding and the constraints that it imposes on the data model that makes it so difficult to work with semi-structured data. You design the schema upfront, define relationships and constraints along with the schema, and organize your data based on them. This is one of the reasons why we need to have separate data models for writes (normalized), queries and reports (denormalized).

In an earlier post, I had drawn upon some of my thoughts on Data 2.0, as I see it in an application today. The fact is that, data is no longer viewed as something to be abstracted in a uniform storage and drenched out using a single query language. Gone are the days when we used to think of the ORM as the grand unifier for all database platforms. Both the O and the R are fast losing ubiquity in today's application development context, at least they are not as universal as they used to be, half a decade back.

Things are surely moving towards polyglotism in the data modeling world as well. Not all of your data need to be on the relational database - distribute data to where it belongs. Sometime back Alex Miller had a post that suggested distribution and modeling of data based on lifetimes. Gone are the days when you need to persist your conversational data in the database for scaling out your application. Modern day grid platforms like Terracotta and Gigaspaces offer network attached memory that will store this data for you in the form that is much closer to the application model along with transparent clustering of your application.

Not all data need a relational model. In a typical application, there are data that are document oriented, does not need to have a fixed schema attached to it. They can be stored as key-value pairs and their primary reason for existence is to support fast, real fast inserts, updates and key based lookups. The semantics of the data inside the value is fairly opaque and does not need to have any constraints of relation with the rest of the data model. Why coerce such a simple model into one that forces you to pay the upfront tax of normalization, constraint enforcement, index rebuilding and joins. Think simple, think key value pairs. And we have lots of them being used in the application space today. Rip such data out of your relational model into lightweight transactional stores that scale easily and dynamically. Long lived persistent data can however, happily choose to stay around within the confines of your normalized relational model.

One of the significant advantages that you get out of storing data on the key/value stores is that your persistent data is now more closely mapped to the objects and classes of your application. This leads to less of a cognitive dissonance that the relational data model enforces upon the application.

But what about queries that can fetch relevant records from the key/value stores based on user defined criteria ?

Obviously primary key based fetch is not always that useful in practical applications. All distributed key/value stores provide the capability to index based on custom defined filters and in conjunction with full text search engines like Lucene, return collections of selected entries from the data store. Have a look at this article demonstrating how Sphinx, a full text search engine, can be integrated with MemcacheDB, a distributed key-value store which conforms to the memcached protocol and uses Berkeley DB as its storage back-end.

CouchDB provides an interesting view model that offers the capability to aggregate and query documents stored in the database through the map-reduce paradigm. Users can define the computation to model the query using map functions and subsequent aggregates using the reduce function that make relevant data available to the user.

So, now that your data model is polyglotic, there can be situations where you may need to synchronize data across multiple storage engines. Here is an example ..

In a real life trading application, huge volume of trade messages need to be processed by the back office within a very short time interval. This needs scaling out to throttle at a rate that can be achieved more easily using the light payloads that schemaless, amorphous key value stores offer than traditional relational databases. You just need to validate the message and store it against the trade reference number. Also, these peer based distributed key/value databases offer easy bi-directional replication and updates to shared data in disconnected mode, which can make offline message processing, followed by synchronization later on, quite affordable. Hence it makes every sense to offload this message processing from the database and deploy clusters of key/value stores.

However, there is a second level processing that needs to be done, which updates all books and accounts and customer balances based on each individual trade. And this information needs to be stored as a system of record for various queries, reports, audit trails and other subsequent downstream processing. One way this can be achieved is by pushing the second level of processing to scheduled queue jobs that asynchronously operate on the key value store, do all relevent heavy lifting with the processing of data and finally pushing the balances to the back end relational database. Currently we are doing everything as one synchronous ACID transaction against an RDBMS. Distribute the model based on data lifetime, rely on asynchronous processing, and what you get is eventual consistency with the goodness of better scalability.

A few days back I was listening to this Scaling DIGG episode with Joe Stump. He mentioned repeatedly that it's not the language, the bottleneck is the IO and the latency resulting from the IO. And the database is the single most bottleneck in your architecture. More specifically, it is NOT the database per se, but the synchronous communication between the application tier and the persistent data layer. This is the reason why DIGG is architected around Gearman, MemcacheDB and MogileFS. However, not all sites need the scalability of DIGG. But even with applications that need a fraction of scalability compared to DIGG, architecting it away from strictly synchronous ACID transaction oriented data sources is the way to go.

Sunday, March 01, 2009

Preemptive Bloom Filter

Michael Mitzenmacher describes a great trick with bloom filter and hash table that has been used in Longest Prefix Matching techniques to improve performance by reducing the number of dependent memory lookups.

Just in case you are not familiar with bloom filters, have a look at the great introduction in Wikipedia ..
The Bloom filter, conceived by Burton H. Bloom in 1970, is a space-efficient probabilistic data structure that is used to test whether an element is a member of a set. False positives are possible, but false negatives are not. Elements can be added to the set, but not removed (though this can be addressed with a counting filter). The more elements that are added to the set, the larger the probability of false positives.


Suppose you are doing lookups of large keys in a slow hash table, and the lookups are primarily for negative search, i.e. in most of the cases, the lookup will fail. In such cases you can front end the (slow) hash table lookup with a bloom filter for the keys in a fast memory. Since most of your lookups are likely to fail, you will potentially save a lot by avoiding access to slower memory based hash table. Of course the bloom filter can return false positives (remember, it never gives false negatives), where you will still have to lookup the hash table.

I was wondering if the same technique can be generalized for lookups in database tables. For all sets of indexes that will be typically used for searching the table, we can have separate bloom filters in fast memory, which will lead to lesser and lesser access of disk based database tables. Of course this will work meaningfully for tables on which lookup failures outweigh successes, as Michael gives some examples of URLs on blacklist or dangerous packet signatures.

A nice trick to have up your sleeve ..