Monday, August 12, 2013

mid-2013, my Spark Odyssey...

Short intro

In this post, I'll talk about yet another technology-tester project I've worked on these last days: Spark.
Long story short, spark is a handy distributed tool that helps in manipulating Big Data from several sources -- even hot streams.


In the first part, I'll put the context in place.

  1. I'll start with a gentle bunch of words about Spark and Spark-streaming.
  2. Then I'll give some bits about the project I've realized so far, and why.
The second part will discuss some implementation details.

  1. Setup: spark streaming
  2. Twitter stream filtered by keywords to DStream
  3. Tweets' sentiment analyzer
  4. Yahoo finance CSV data and the actor consumer
  5. Actor consumer publishing on a DStream (back-ended by a dedicated actor)
  6. Aggregation (union) based on a 60 seconds length window
  7. Publishing the results using Spray
What're missing so far (among other business related things for instance):
  1. cluster deployment...
  2. ... and configuration
  3. probably a storing strategy for further consumptions (Riak, ...?)
  4. a web application that shows the results or configure the processes



I won't discuss Spark that much because there are already a plenty of good blogs/articles on that. However, I'll give you just the intuition that you got the whole thing about it:
  • Spark is a collection of functions that works on a Sequence (List) of typed data  --  Dataset = Sequence ; operation = Function
  • Chaining these functions will define a workflow for your data  --  combination
  • Each data will be processed only once by one or another Spark worker  --  Distributed
  • If it fails for some external reason, the data can be fetched again from the source and the workflow to be replayed entirely  --  Resilient
Spark defines an ResilientDistributedDataset which is very different that the classical MR -- because it is distributed by essence, a workflow can easily be iterative and doesn't require intermediate caching/storage.

Also, spark as a sub-project spark-streaming that allows manipulating streamed data. The idea is very simple by defining a DStream as being a sequence of RDDs containing the data for a certain slice of time.

Oh, RDD and DStream are typesafe -- the whole thing being Scala code.

The spark-streaming project is really well integrated and can consume a lot of streams like Twitter, File System, Kafka or even an Akka actor.


This blog relates a small project publicly available on github (spark-bd) that helped me catch the big picture of Spark, after having followed the mini course here.

The project is based on another working group I'm participating in. This project, simply called project 2, has been initiated and is still supported by the great Belgian Big Data user group (
The idea was to co-create in several workshops an application that given a list of companies to analyse;
  • catches the Twitter stream and the Yahoo finance data using Storm;
  • manipulates both by either applying a sentiment analysis or simply the changes in price;
  • aggregates by window of time both information for each company
  • gives (constantly) the information about how is going a given list of companies on the market.
Fairly simple and really funny... the project is still ongoing but you can check its progression here.

It was really funny but I wanted to see how my feeling would compare by writing the same application using my actual preferred language (Scala) and the next technology in the Big Data field I was eagerly looking at (Spark).

IMPORTANT: since if I'm not there yet to make a real comparison (a niche for another blog, later on), I'll only give an insight on my actual work.



The setup is very minimal for a Spark application. All we certainly need is reduced to two things:
  1. an SBT project declaring as dependencies Spark and Spark-streaming.
  2. in a main class/object configure the spark streaming context.
The SBT project only requires an build.sbt in the root folder that contains these lines only:

Where the only lines that are under interest are the two last ones.

Then we can create an object that will start the spark context.
Here we just asked Spark to create a streaming context on the local environment, named Project2 and that will slice the DStream into RDDs of 5 seconds duration each.

Afterwards, we asked the context to start its work... which is nothing up to now, since nothing as been added yet to the context.
So let's do it.

Twitter stream

Here we'll consume the tweets stream provided by the Twitter's API. For that, we'll have to do only two things (a pattern emerge...?):
  1. register the 4 needed information for an OAuth2 authentication to the Twitter API (so don't forget to add an application to your Twitter development account).
  2. add a tweets stream to the spark context
Actually, the streaming context as convenient methods to add a twitter stream.
For that it'll use the Twitter4J library (it's definitively not the best, but at least we can achieve our goal).

This library needs several keys to be set in the
There are a plenty of way to do that, in my project I took something rather straightforward: I've picked the Typesafe config library and added those four configuration keys in an application.conf file, then I load all value in the related System property.

As you may (or not) see, the application.conf file will look for environment variables giving values for the required key/secret/token.

That's done we can start listening for tweets:

Dead simple, after having created a model for a company/stock, we used the configured keywords as tags for the twitter stream.
Behind the sea, Twitter4J will access the stream end-point with these tags, and Twitter to pre-filter all tweets based on those keywords (in hashtag, username, text, ...).

At this stage we've only added the stream, but we didn't yet said anything about how to consume them... So we can simply call print() on the stream to ask spark printing the 10 first incoming event in each RDD.

However, that's not how we want to deal with those tweets right? Let's adapt our stream to add relevant information for our use case.

Sentiment analyzer

Now that we have a stream of tweets it's time to read them and to give them a sentiment score based on a sentiments file. The code that reads it and that assign a score for a tweet is not that important here, but the code is here.

However, what we would like to do is to adapt our stream with this score attached to each tweet.

As said earlier, Spark is a bunch of methods on Sequences of data, so we can now assert that it's true. 

Look at what was done in the map, we picked each tweet, fetch the list of sentiment entries that match the text, and also we kept the whole original status for further computation. This results in a DStream[(List[Sentiment], Status)].

Since we only want those tweets that have at least one Sentiment that matches, we then filtered on the length of this matching list.

We combined both actions to get a resulting filtered and sentiment-analyzed tweets.

Okay great, but what we really want is to have such information grouped by Company/Stock right? So that we can compute the participation of the social network to the health of each company.

To do so, we have to group semantically (based on text's content and required stocks' keywords).
Note that it's not that straightforward because a tweet could relate several companies at once!

That's all! We're now done with the twitter part, we are now getting all Data that gathered the information of a Company/Stock, the tweet and its associated sentiment. Which opens doors for further aggregations/grouping/reducing.


Let's now pause a bit, I encourage you to try your own stuffs and playing with the tweets.
For that, I also encourage you to fork/clone the original repo, where you can already start only the twitter feed and print everything in the console by running the appropriate command:
$> sbt
sbt> run-main be.bigdata.p2.P2 twitter print GOOG AAPL

This will run the start spark and filter the tweets based on the keywords associated to both GOOG and AAPL, whose tweets will be analyzed as said in the last section before being printed to the console.
WARN: don't forget to export the required environment variables


In the next blog posts, I'll continue with the explanation on how use Akka to construct Spark streams and how to combine several streams into a single one.

This multiplexed stream will then be used to compute an aggregated data by a window of time.

Having these data in hand, we'll be able to create a Spray server that expose them into a JSON format, helping us creating a client web application that simply consumes the JSON messages.

No comments:

Post a Comment