Skip to content

MobilityDB/MobilityFlink

Repository files navigation

An open-source geospatial trajectory data streaming platform based on Flink.

MobilityDB Logo

MobilityFlink explores the advantages of MobilityDB datatypes and functions in the Flink environment, using the JMEOS library as middleware.

The MobilityDB project is developed by the Computer & Decision Engineering Department of the Université libre de Bruxelles (ULB) under the direction of Prof. Esteban Zimányi. ULB is an OGC Associate Member and member of the OGC Moving Feature Standard Working Group (MF-SWG).

OGC Associate Member Logo

More information about MobilityDB, including publications, presentations, etc., can be found in the MobilityDB website.

Building Real-Time Data Streaming using Kafka, Apache Flink, and Postgres

Vessel Count by Area and Time Period

This query is a spatiotemporal aggregation. We count the number of distinct vessels in a specified geographic area and period. We use spatial filtering with bounding boxes and temporal filtering.

Structure and data pipeline

The pipeline consists of three docker images (Kafka, Flink, and MobilityDB) built into a docker container and connected internally by a network bridge.

Architecture

Data is generated by reading a CSV file into Kafka (Kafka producer) by the python-producer.py. To change the data file, you can modify the CSV in python-producer.py.

Flink consumes directly into the Kafka topic. It then proceeds to deserialize the message into a data structure. In addition, we set a watermark strategy, that is, assigning event timestamps, handling out-of-order data and defining how late a record can be. Also, we define how long a data source can be idle (not receiving any data) before its resources are released. For counting ships, we use an aggregation function. This aggregation function works inside the window; in this case, we have a sliding window of 10 seconds. The count is done by checking (CountAggregator()) if the point intersects with the spatial bounding box.

In addition, there is a timer to wait for Kafka to produce some tuples before starting Flink (wait-for-it.sh).

Prerequisites

Docker Maven 3.9.6 Java 21 JMEOS (JMEOS jar file is already include in flink-processor/jar. To install a new version of JMEOS, go to https://github.com/MobilityDB/JMEOS/tree/main/jar and download the JAR file.)

To compile and run

We should create a docker image for Kafka, Flink, and MobilityDB. To do this, we go to each directory and create each image. Finally, we go to the main directory and compose the docker container.

cd postgres
docker build -t postgres .
cd ..
cd kafka-producer
docker build -t kafka-producer .
cd ..
cd flink-processor
mvn clean package
docker build -t flink-processor .
cd ..
docker-compose up -d

Results

Kafka producer Kafka producer

Flink Processor Flink Processor

BerlinMOD-9 × 3 streaming forms — the parity matrix on Flink

The streaming-side parity matrix runs all nine BerlinMOD reference queries (Q1..Q9) in three streaming forms each on this runtime: continuous (always-on, per-event emission), windowed (tumbling 10-second aggregation), and snapshot (5-second tick — the parity-oracle form whose output at watermark T equals the batch BerlinMOD-Q result on data up to T).

Q Topic Continuous Windowed Snapshot
Q1 "which vehicles have appeared in the stream?"
Q2 "where is vehicle X at time T?"
Q3 "vehicles within d of P at time T?"
Q4 "vehicles entered region R, and when?"
Q5 "pairs of vehicles meeting near P"
Q6 "cumulative distance per vehicle"
Q7 "first passage of vehicles through POIs"
Q8 "vehicles close to a road segment"
Q9 "distance between vehicles X and Y at time T"

27 / 27 cells = the full MobilityFlink parity-matrix row. Each cell has a dedicated Q<N>{Continuous,Windowed,Snapshot}Function class in flink-processor/src/main/java/berlinmod/ and is locally verified via the companion BerlinMODQ<N>LocalTest driver running on a Flink mini-cluster.

The streaming snapshot form converges to the batch BerlinMOD result on the same scale-factor corpus, anchored against the cross-platform outputs in MobilityDB-BerlinMOD.

Spatial predicates today use pure-Java great-circle (Haversine) and planar segment-distance (SegmentDistance) utilities; each call site is marked TODO(meos) for JMEOS-bridge migration after JMEOS#15 (the MEOS 1.4 regen) settles.

The Kafka-source entry points for Q2 and Q3 are BerlinMODQ2Main and BerlinMODQ3Main; the companion producer is python-producer-berlinmod.py. Generate a BerlinMOD CSV with the upstream generator (meos/examples/data/generate_berlinmod_trips.sql in MobilityDB) at any scale factor and feed it to the producer. The form-by-form definition with default parameters lives in doc/berlinmod-q3-streaming-forms.md.

Sibling parity work in the ecosystem

MobilityFlink An open-source geospatial trajectory data streaming platform based on Flink.

MobilityDB Logo

MobilityFlink explores the advantages of MobilityDB datatypes and functions in the Flink environment, using the JMEOS library as middleware.

The MobilityDB project is developed by the Computer & Decision Engineering Department of the Université libre de Bruxelles (ULB) under the direction of Prof. Esteban Zimányi. ULB is an OGC Associate Member and member of the OGC Moving Feature Standard Working Group (MF-SWG).

OGC Associate Member Logo

More information about MobilityDB, including publications, presentations, etc., can be found in the MobilityDB website.

Building Real-Time Data Streaming using Kafka, Apache Flink, and Postgres

Vessel Count by Area and Time Period

This query is a spatiotemporal aggregation. We count the number of distinct vessels in a specified geographic area and period. We use spatial filtering with bounding boxes and temporal filtering.

Structure and data pipeline

The pipeline consists of three docker images (Kafka, Flink, and MobilityDB) built into a docker container and connected internally by a network bridge.

Architecture

Data is generated by reading a CSV file into Kafka (Kafka producer) by the python-producer.py. To change the data file, you can modify the CSV in python-producer.py.

Flink consumes directly into the Kafka topic. It then proceeds to deserialize the message into a data structure. In addition, we set a watermark strategy, that is, assigning event timestamps, handling out-of-order data and defining how late a record can be. Also, we define how long a data source can be idle (not receiving any data) before its resources are released. For counting ships, we use an aggregation function. This aggregation function works inside the window; in this case, we have a sliding window of 10 seconds. The count is done by checking (CountAggregator()) if the point intersects with the spatial bounding box.

In addition, there is a timer to wait for Kafka to produce some tuples before starting Flink (wait-for-it.sh).

Prerequisites

Docker Maven 3.9.6 Java 21 JMEOS (JMEOS jar file is already include in flink-processor/jar. To install a new version of JMEOS, go to https://github.com/MobilityDB/JMEOS/tree/main/jar and download the JAR file.)

To compile and run

We should create a docker image for Kafka, Flink, and MobilityDB. To do this, we go to each directory and create each image. Finally, we go to the main directory and compose the docker container.

cd postgres
docker build -t postgres .
cd ..
cd kafka-producer
docker build -t kafka-producer .
cd ..
cd flink-processor
mvn clean package
docker build -t flink-processor .
cd ..
docker-compose up -d

Results

Kafka producer Kafka producer

Flink Processor Flink Processor

About

Geospatial trajectory data streaming platform built on Flink

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages