For the demo application, we generate a stream of taxi ride events from a public dataset of the New York City Taxi and LimousineCommission (TLC). The data set consists of records about taxi trips in New York City from 2009 to 2015. We took some of this data and converted it into a data set of taxi ride events by splitting each trip record into a ride start and a ride end event. The events have the following schema:
rideId: Longtime: DateTime // start or end timeisStart: Boolean // true = ride start, false = ride endlocation: GeoPoint // lon/lat of pick-up or drop-off locationpassengerCnt: shorttravelDist: float // -1 on start events
We implemented a custom SourceFunction to serve a DataStream[TaxiRide] from the ride event data set. In order to generate the stream as realistically as possible, events are emitted by their timestamps. Two events that occurred ten minutes after each other in reality are ingested by Flink with a ten minute lag. A speed-up factor can be specified to “fast-forward” the stream, i.e., with a speed-up factor of 2.0, these events are served five minutes apart. Moreover, the source function adds a configurable random delay to each event to simulate the real-world jitter. Given this stream of taxi ride events, our task is to compute every five minutes the number of passengers that arrived within the last 15 minutes at locations in New York City by taxi.
As a first step we obtain a StreamExecutionEnvironment and set the TimeCharacteristic to EventTime. Event time mode guarantees consistent results even in case of historic data or data which is delivered out-of-order.
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
Next, we define the data source that generates a DataStream[TaxiRide] with at most 60 seconds serving delay (events are out of order by max. 1 minute) and a speed-up factor of 600 (10 minutes are served in 1 second).
// Define the data sourceval rides: DataStream[TaxiRide] = env.addSource(new TaxiRideSource( “./data/nycTaxiData.gz”, 60, 600.0f))
Since we are only interested in locations that people travel to (and not where they come from) and because the original data is a little bit messy (locations are not always correctly specified), we apply a few filters to first cleanse the data.
val cleansedRides = rides
// filter for ride end events .filter( !_.isStart ) // filter for events in NYC .filter( r => NycGeoUtils.isInNYC(r.location) )
The location of a taxi ride event is defined as a pair of continuous longitude/latitude values. We need to map them into a finite set of regions in order to be able to aggregate events by location. We do this by defining a grid of approx. 100x100 meter cells on the area of New York City. We use a utility function to map event locations to cell ids and extract the passenger count as follows:
// map location coordinates to cell Id, timestamp, and passenger countval cellIds: DataStream[(Int, Long, Short)] = cleansedRides
.map { r => ( NycGeoUtils.mapToGridCell(r.location), r.time.getMillis, r.passengerCnt ) }
After these preparation steps, we have the data that we would like to aggregate. Since we want to compute the passenger count for each location (cell id), we start by keying (partitioning by key) the stream by cell id (_._1). Subsequently, we define a sliding time window and run a WindowFunction
; by calling apply():
val passengerCnts: DataStream[(Int, Long, Int)] = cellIds
// key stream by cell Id .keyBy(_._1) // define sliding window on keyed stream .timeWindow(Time.minutes(15), Time.minutes(5)) // count events in window .apply { ( cell: Int, window: TimeWindow, events: Iterable[(Int, Short)], out: Collector[(Int, Long, Int)]) => out.collect( ( cell, window.getEnd, events.map( _._2 ).sum ) ) }
The timeWindow()operation groups stream events into finite sets of records on which a window or aggregation function can be applied. For our application, we call apply() to process the windows using a WindowFunction. The WindowFunctionreceives four parameters, a Tuple that contains the key of the window, a Window object that contains details such as the start and end time of the window, an Iterableover all elements in the window, and a Collector to collect the records emitted by the WindowFunction. We want to count the number of passengers that arrive within the window’s time bounds. Therefore, we have to emit a single record that contains the grid cell id, the end time of the window, and the sum of the passenger counts which is computed by extracting the individual passenger counts from the iterable (events.map( _._2)) and summing them (.sum).
Finally, we translate the cell id back into a GeoPoint (referring to the center of the cell) and print the result stream to the standard output. The final env.execute() call takes care of submitting the program for execution.
val cntByLocation: DataStream[(Int, Long, GeoPoint, Int)] = passengerCnts // map cell Id back to GeoPoint.map( r => (r._1, r._2, NycGeoUtils.getGridCellCenter(r._1), r._3 ) )cntByLocation// print to console.print()env.execute(“Total passenger count per location”)
If you followed the instructions to import the demo code into your IDE, you can run theSlidingArrivalCount.scala program by executing its main() methods. You will see Flink’s log messages and the computed results being printed to the standard output.
You might wonder why the the program produces results much faster than once every five minutes per location. This is due to the event time processing mode. Since all time-based operations (such as windows) are based on the timestamps of the events, the program becomes independent of the speed at which the data is served. This also means that you can process historic data which is read at full speed from some data store and data which is continuously produced with exactly the same program.
Our streaming program will run for a few minutes until the packaged data set is completely processed but you can terminate it at any time. As a next step, we show how to write the result stream into an Elasticsearch index.
Prepare the Elasticsearch
The Flink Elasticsearch connector depends on Elasticsearch 1.7.3. Follow these steps to setup Elasticsearch and to create an index.
Download Elasticsearch 1.7.3 as .tar (or .zip) archive here.
Extract the archive file:
tar xvfz elasticsearch-1.7.3.tar.gz
Enter the extracted directory and start Elasticsearch
cd elasticsearch-1.7.3./bin/elasticsearch
Create an index called “nyc-idx”:
curl -XPUT "http://localhost:9200/nyc-idx"
Create an index mapping called “popular-locations”:
curl -XPUT "http://localhost:9200/nyc-idx/_mapping/popular-locations" -d'
{
"popular-locations" : {
"properties" : {
"cnt": {"type": "integer"},
"location": {"type": "geo_point"},
"time": {"type": "date"}
}
}
}'
The SlidingArrivalCount.scala program is prepared to write data to the Elasticsearch index you just created but requires a few parameters to be set at the beginning of the main() function. Please set the parameters as follows:
val writeToElasticsearch = true
val elasticsearchHost = // look up the IP address in the Elasticsearch logsval elasticsearchPort = 9300
Now, everything is set up to fill our index with data. When you run the program by executing the main() method again, the program will write the resulting stream to the standard output as before but also insert the records into the nyc-idx Elasticsearch index.
If you later want to clear the nyc-idx index, you can simply drop the mapping by running
curl -XDELETE 'http://localhost:9200/nyc-idx/popular-locations'
and create the mapping again with the previous command.
Visualizing the results with Kibana
In order to visualize the data that is inserted into Elasticsearch, we install Kibana 4.1.3 which is compatible with Elasticsearch 1.7.3. The setup is basically the same as for Elasticsearch.
1. Download Kibana 4.1.3 for your environment here.
2. Extract the archive file.
3. Enter the extracted folder and start Kibana by running the start script: ./bin/kibana
4. Open http://localhost:5601 in your browser to access Kibana.
Next we need to configure an index pattern. Enter the index name “nyc-idx” and click on “Create”. Do not uncheck the “Index contains time-based events” option. Now, Kibana knows about our index and we can start to visualize our data.
First click on the “Discover” button at the top of the page. You will find that Kibana tells you “No results found”.
This is because Kibana restricts time-based events by default to the last 15 minutes. Since our taxi ride data stream starts on January, 1st 2013, we need to adapt the time range that is considered by Kibana. This is done by clicking on the label “Last 15 Minutes” in the top right corner and entering an absolute time range starting at 2013-01-01 and ending at 2013-01-06.
We have told Kibana where our data is and the valid time range and can continue to visualize the data. For example we can visualize the arrival counts on a map. Click on the “Visualize” button at the top of the page, select “Tile map”, and click on “From a new search”.
See the following screenshot for the tile mapconfiguration (left-hand side).
Another interesting visualization is to plot the number of arriving passengers over time. Click on “Visualize” at the top, select “Vertical bar chart”, and select “From a new search”. Again, have a look at the following screenshot for an example for how to configure the chart.
Kibana offers many more chart types and visualization options which are out of the scope of this post. You can easily play around with this setup, explore Kibana’s features, and implement your own Flink DataStream programs to analyze taxi rides in New York City.
We’re done and hope you had some fun
In this blog post we demonstrated how to build a real-time dashboard application with Apache Flink, Elasticsearch, and Kibana. By supporting event-time processing, Apache Flink is able to produce meaningful and consistent results even for historic data or in environments where events arrive out-of-order. The expressive DataStream API with flexible window semantics results in significantly less custom application logic compared to other open source stream processing solutions. Finally, connecting Flink with Elasticsearch and visualizing the real-time data with Kibana is just a matter of a few minutes. We hope you enjoyed running our demo application and had fun playing around with the code.
Fabian Hueske is a PMC member of Apache Flink. He is contributing to Flink
since its earliest days when it started as research project as part of his PhD studies at TU Berlin. Fabian did internships with IBM Research, SAP Research, and Microsoft Research and is a co-founder of data Artisans, a Berlin-based start-up devoted to foster Apache Flink. He is interested in distributed data processing and query optimization.