Hotwire Tech Blog

Scribes from Hotwire Engineering

What’s the problem?

Most services handle large data amount. It can come from different places, such as Kafka, RabbitMQ, ActiveMQ. Large amount of data can slow down your application or even make it dead.

We know it, but what is the solution?

We need a realtime computation system focused on distributed processing of large data streams.

  • Apache Stormis a distributed stream processing computation framework written in the Clojure by BackType, composed of  open source components, especially ZooKeeper for cluster management, ZeroMQ for multicast messaging, and Kafka for queued messaging. It provides massively scalable event collection.
  • Apache Spark – is a general framework for large-scale data processing that supports lots of different programming languages and concepts such as MapReduce, in-memory processing stream processing, graph processing or machine learning.
  • Amazon Kinesis – streams data in real time with the ability to process thousands of data streams on a per-second basis. The service, designed for real-time apps, allows developers to pull any amount of data, from any number of sources, scaling up or down as needed (similar in functionality to Apache Kafka). It is deeply integrated with other AWS cloud services such as S3, Redshift or DynamoDB.

 

Why Storm?

Storm makes it easy to reliably process unbounded streams of data, doing for realtime processing the same as Hadoop did for batch processing. Storm is simple, can be used with any programming language, and is fun to use!

Storm is designed as a “topology” as a directed acyclic graph (DAG) with spouts and bolts serving as vertices of a graph. Edges in the graph are called streams and direct data from one node to another. However topology acts as a data transformation pipeline. At a superficial level topology overall structure is similar to the work MapReduce, with the main difference being that the data are processed in real time, unlike in separate batches. Furthermore, Storm topologies operate indefinitely until killed, while DAG MapReduce work must eventually end.

At its core, Storm is the basis for real-time, distributed, fault-tolerant computing. It means that the storm gives a set of abstractions, to help build a system that can analyze a large amount of the streaming data in real time, while several computing nodes.

Comparing Storm with Spark – Storm has low level abstraction,  Spark – high level abstraction. And performance  in Storm is higher than in Spark.

Kinesis application consists of just one procedure, so you can’t do complex stream processing like can be done with Storm unless you connect together multiple Kinesis applications. This suggests to me that Kinesis is, like many AWS offerings, a nice tool for simple workloads. If you need to do something complex, though, Kinesis probably cannot deliver the results you want. For that you will still need Storm.

Key features of the system:

  • Scalability
  • Guaranteed protection against data loss
  • Ease of deployment and maintenance
  • Disaster Recovery
  • Possibility of writing components not only on Java

Storm elements:

  • Tuple – data presentation element
  • Stream – the sequence of the Tuple
  • Spout – the data provider for Stream
  • Bolt – the data processor
  • Topology – the set of elements describing their relationship

 

How we use it?

We have simple application, which creates topology and submits to Storm cluster.

 // Create Kafka consumer (SPOUT)
private KafkaSpout kafkaSpout() {
if (kafkaSpout == null) {
ZkHosts zkHosts = new ZkHosts(env.getZookeeperHost());
SpoutConfig kafkaConfig =
new SpoutConfig(zkHosts, KAFKA_TOPIC_NAME);
kafkaSpout = new KafkaSpout(kafkaConfig);
}
return kafkaSpout;
}
// Create storm topology, which contains Kafka SPOUT (Kafka consumer) and process BOLT
StormTopology stormTopology() {
if (stormTopology == null) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(KAFKA_SPOUT, kafkaSpout());
builder.setBolt(PROCESS_BOLT, processBolt()).shuffleGrouping(KAFKA_SPOUT);
stormTopology = builder.createTopology();
}
return stormTopology;
}


// Submit storm topology to storm cluster
private void run() {
try {
StormSubmitter.submitTopology(TOPOLOGY_NAME, stormConfig(), stormTopology());
}
catch (AlreadyAliveException | InvalidTopologyException ignored) {
}
}

 

Application have only one Bolt, which depending on the input emit result.

Application have only one Bolt, which depending on the input emit result.
// Simple Bolt which read data from Tuple and write to stream results.
public class ProcessBolt extends BaseBasicBolt {
@Override
public void execute(Tuple input, BasicOutputCollector basicOutputCollector) {
switch (input.getString(0)) {
case "value1":
basicOutputCollector.emit(new Values("result1"));
break;
case "value2":
basicOutputCollector.emit(new Values("result2"));
break;
     }
   }
 }

 

How about some IT on Storm?

We set up our application on Storm. To make sure it works fine we need to test this system. Running Storm cluster for every test would be expensive. We could have run a local cluster on every test, but it is not convenient. Storm provides  a way of testing where developer can use  mocked source for spout and easy read result. It is soooo easy 🙂

 @Test
public void testBasicTopology() {
withSimulatedTimeLocalCluster(cluster -> {
// Get topology from main application-
StormTopology topology =
new Application(Environment.LOCAL).stormTopology();
// Create mocked sources
MockedSources mockedSources = new MockedSources();
CompleteTopologyParam completeTopologyParam = new CompleteTopologyParam();
completeTopologyParam.setMockedSources(mockedSources);
completeTopologyParam.setStormConf(conf);
// Complete topology and get results
Map result = completeTopology(cluster, topology, completeTopologyParam);
// Check results
assertThat(new Values(new Values( expectedResult())))
.isEqualTo(readTuples(result, PROCESS_BOLT));
}

This is how we do it.

Storm provides a lot of possibilities to work with, so you should definitely try it out.