We recently developed an application for a customer to detect electricity outages using smart meter data. So, with that inspiration, we extended it to detect natural gas leaks using smart meter data in residential areas in real-time.

The aim of the project was to compare real-time data with historical averages and check if there is a drastic increase in gas consumption by comparing against the historical data. For example, take a house on a street, its historical average usage is 40 units per day for over a year or six months, whatever the measuring period is set to be. Which means, if we see an increase in the average usage by twice or thrice (which comes to around 80 or 120 units), then there may be a leak. Initially, we may see this increase for a day or two, but if this pattern continues over a period of a week or two weeks and the value does not return to the historical normal value, then we can flag it as a gas leak.

Let’s take a look at the implementation details. We are simulating a real-world scenario by continuously generating customer usage data and publishing it to a kafka message queue topic. The detection application is streaming this data in continuously. After doing the necessary enrichments and transformations on data, OLAP computation is performed on the data by sending the data stream to a streaming OLAP service, developed by my company Data Torrent, called OAS. OLAP in simple terms is the computation of aggregates of different features in the data for different key combinations in the data for various time intervals with different granularities such as minute, hour, day. Then, further analysis is performed on these aggregates to detect outages by applying some rules. The output is then visualized in UI dashboards in the DataTorrent RTS platform. In the sections below, we will look into the details for these steps.

Reading From Kafka:

As mentioned earlier, the input to the detection application is customer usage data in real-time. In the simulation, two usage readings are being generated per second for every customer. The application is reading this data from a kafka topic in a streaming fashion using the “KafkaSinglePortInputOperator” from the Malhar library. The operator supports scaling according to partitions for the kafka topic and fault tolerant, idempotent, reading from kafka.

The format of the messages being read is as follows:

{ 1516640052541, SJBH1001, 67}.
1516640052541 – is the timestamp when the usage was measured
SJBH1001 – is customer id.
67 – is the usage.

DAG is shown in below screenshots.

Enrichment:

After the data is parsed, it is enriched with the customer name, phone number, house number, street name and city by looking up against the customer id from the data. The enrichment information is fetched from database by doing a lookup. An in-memory cache is used to hold the most recently used records from the database to minimize the number of database lookups for lower latency. The size of the cache can be configured for a desired SLA. An enriched POJO (Plain Old Java Object) is sent with the enriched information along with the original event information to the next operator.

Online Analytics Service (OAS) details:

The enriched POJO has the necessary information to calculate many cumulative analytics such as average usage by customer or city, total usage by street etc., for various time scales such as per-minute, hourly, or daily, that can provide useful insights into the usages. For this purpose, the platform provides a general purpose real-time OLAP service called Online Analytics Service (OAS). OAS is built on top of druid platform.

OAS internally does the computation of multiple aggregates, for a number of measures, for different time intervals, and for various key combinations in the data – all of which are configurable and specified by the user. For example, in the enriched data, we might be interested in finding out the total usage on an hourly basis for every street or average usage per city on a daily basis.

Yet another example for a better understanding of OAS, a sensor’s data may arrive (or be processed) later than similar data from other sensors that was “created” during the same time interval but was processed earlier since it arrived earlier. They all need to be accounted for in the same time bucket based on the event creation time and not the processing time. The Service also provides lookup and query for the aggregates to third party tools, such as UI dashboards to be able to visualize historical usage information.

The user should specify which fields in the data are keys and which are measures, for example: customer id, street and city are keys and usage is a measure. They also need to specify the field in the data that identifies the event timestamp for the data and what time-buckets to perform the aggregations for, such as per minute, per hour and per day aggregates. The user can specify multiple key combinations to compute the various aggregates. Each combination could be composed of just a single key, such as per street or per city or it could be a combination of multiple keys. The aggregations are performed for each possible value of the combination. The user should also specify what aggregates to perform on the measures, such as total usage or average usage.

In this application, the following keys are chosen and the aggregates are being computed.
Time is 1m, 1h,1d, 30d. Key combinations are city, street, customer id, [city,street],[city,street,customerid]. The aggregates are visualized using real-time dashboards such as one provided by DataTorrent RTS. A user can choose which key combinations and aggregates they want to visualize. The real-time dashboards are dynamic and they show the values as they are changing.

The OAS service also makes the aggregates available to the source or other applications for further analysis. In this case the application receives the computed aggregates and sends it to downstream operators such as the outage detection operator described below to perform for further analysis and business action on them.

Outage detection:

Aggregates from the OAS service are received by the outage detection operator. Here we can check the aggregates to determine if there is an outage. In our case, we are applying a simple rule of checking the average value against a threshold to declare as an outage. But more complex rules or a rule engine operator like Datatorrent CEP operator can be used.

Visualizations

The application provides various visualizations of the computation results.The aggregates are available for query and the results are streamed through a web socket. Visualization dashboards such as the ones provided out-of-the-box by DataTorrent RTS can be used to show visualizations that update in real-time from the streaming data. DataTorrent RTS also comes with a built-in websocket gateway that facilitates the communication between operators and UI dashboards. The outage events are also available and streamed to a visualization dashboard.

Widgets:

Here are some of visualizations:

  1. OutageByStreets: Table shows leaks detected in streets along with city.
  2. OutageByUsers: Table shows leaks detected by users and whether user is alerted or not.

  3. CityWiseGasConsumption: This bar chart as name suggests shows city wise gas consumption.