Guest author Bram Steenwinckel describes how to perform semantic reasoning over streaming data in your knowledge graph.
This blogpost and the code is delivered by the IDLAB research group of Ghent University - imec from Belgium.
Semantic reasoning with streaming data does not necessarily require specialised formalisms and tools. You can perform stream reasoning using Stardog with a relatively simple Python extension, which I’ll demonstrate below.
Companies around the world are connecting their equipment, machinery and even people to the worldwide web. The Internet of Things (IoT for short) defines these connected things as smart devices, all of which are producing floods of data.
This data has become a target for business intelligence and analytics to generate important business insights. However the raw data is often too opaque to process thus it has become a popular approach to annotate it with metadata and integrate into a knowledge graph (possibly after intermediate filtering and aggregations). Semantically enriched graph data can be stored in Stardog for subsequent analytics in the form of logical or statistical reasoning.
There are however important practical cases where analysing static data is less of interest because one wants to react upon the behavioural changes seen in the data directly.
Take, for example, a smart thermostat installed at your home, which has a temperature sensor producing observations. When the temperature is getting low, we want our heating devices to be switched on. If we first offload all the data acquired during the day, one can create an efficient heat plan for the next days. But this approach is not very reactive or adaptive. We want our system to analyse the data in nearly real-time and take actions when necessary.
Reacting or analysing the data when it occurs may sound simple in the context of the above smart thermostat example. But such a paradigm offers companies a wide range of exciting use cases, such as predictive maintenance.
Instead of semantically enriching the data when (or after) it hits the database, we could also semantify the data when it leaves the sensor. Raw observations become semantic observations being streamed.
Most of the time, these semantic observations have additional metadata, specifying the generating sensors, the operating behaviour or other characteristics defined by a semantic model or ontology. Some great tools to start semantifying your raw sensor data, or data in general, can be found online.
Once semantically enriched data is streaming, one can ask if it is possible to perform reasoning on the fly without storing everything in a single, global knowledge graph upfront. One example would be applying reasoning rules encoding important business logic.
The answer is yes, it is possible with Stardog as the following example will demonstrate.
Let’s get deeper into the smart thermostat example. The diagram below shows the different steps of rule-based reasoning in Stardog over a stream of temperature observations.
First, the semantic data from our thermostat flows into a stream processing platform such as Apache Kafka. Next, a Kafka consumer loads all the observations into a Stardog database. The number of loaded observations (or the window) depends on the use case. Afterwards, the database starts reasoning on the given data, taking into account the predefined rules and the ontology. Both the ontology and rules are pre-loaded when the database for the window of observations is provisioned. Finally the inferences are pushed back on the streaming platform using a Kafka producer. As the results are also being streamed, other operations can depend or take actions from it.
If we want, for example, to determine a rise in temperature in a streaming fashion:
We can load the semantic temperature observations into our stream processing platform.
Semantic data is loaded into a Stardog database using a sliding window containing two observations.
A simple semantic rule is checking whether the temperature value of the first observations is sufficiently lower than the second one.
The result is pushed back to the streaming platform.
It’s now time for some technical details of these processing steps.
We use a simple python script to build a Kafka consumer and read the observations from the streaming platform.
def _initialise():
consumer = KafkaConsumer(
SEMANTIC_EVENTS_TOPIC,
bootstrap_servers=KAFKA_BROKER_URL,
value_deserializer=lambda value: json.loads(value),
)
return consumer
if __name__ == '__main__':
consumer = _initialise()
...
for message in consumer:
append_message(message, 'thermostat','temperature')
#DO something
As discussed, we will analyse the semantic observations in a sliding window of 2:
def append_message(message, box, metric):
if len(message_queue[box][metric][-1]) < 2:
#There is some space left in our window
window_queue[box][metric][-1].append(message)
else:
# take the last message from the previous window
p_message = message_queue[box][metric][-1][1:]
# add a new window to our window queue
window_queue[box][metric].append([])
# add the last known message
window_queue[box][metric][-1].extend(p_message)
# add the current message
window_queue[box][metric][-1].append((message, time))
We use a window queue to keep track of all the sliding windows. As data can be streamed in a rather high rate, we must ensure the windows are analysed in order. The next step is to load the window into a Stardog database to start the reasoning process.
Before we can start reasoning, we should launch Stardog and get the data into a database. The naive approach would be create a fresh database for each window. That’d severely impact latency and throughput of the platform since input rate can be high but database initialisation is a rather heavyweight operation (it needs to set up the storage backend with all indices and data structures for transactions, and in addition pre-process the rules). Instead we initialise a fixed-size pool of Stardog databases each containing the rule (and, if needed, any additional ontology) and ready for use.
# Multiprocessing manager used to keep track of all databases
manager = Manager()
db_pool = manager.list([True for x in range(0, DATABASES)])
# create database pool with 10 databases
for i in range(0, 10):
try:
with stardog.Admin(**conn_details) as admin:
admin.database('db'+str(i)).drop()
except:
print("no database to drop")
with stardog.Admin(**conn_details) as admin:
admin.new_database('db'+str(i))
with stardog.Connection('db'+str(i), **conn_details) as conn:
conn.begin()
# preload the rule
conn.add(stardog.content.File('rule.ttl'))
conn.commit()
This setup enables very fast provisioning of databases when data arrives. In addition, the pool supports distributing the workload over multiple databases and reasoning on multiple windows in parallel which further boosts the throughput of the system.
def reason_window(message_queue, box):
# get an available database (check if there are available databases is performed upfront)
db = db_pool.index(True)
# set database to used
db_pool[db] = False
# Start separate process
p = Process(target=reason_window, args=(message_group, db, box))
p.start()
return p, db
def reason_window(message_queue, db_i, box):
# make connection with database
try:
with stardog.Connection('db'+str(db_i), **conn_details) as conn:
# start reasoning
reason(conn, message_group, db_i, box)
except Exception as e:
print(e)
def reason(conn, message_queue, db_i, box):
# get window from message group
message = message_group.pop()
# add message to database
conn.add(stardog.content.Raw(message))
# Start reasoning
try:
conn.begin(reasoning=True)
res = conn.select("""
ASK {
# rule specific content
?anomaly1 <http://example.com/temp_raised_high> ?type1 .
}
""", reasoning=True)
# check result and report
if res['boolean']:
report_anomaly(message, box, conn)
except Exception as e:
print(e)
# clean database before accepting new window
conn.rollback()
conn.begin()
conn.remove(stardog.content.Raw(message))
conn.commit()
The rule used in our example is rather simple and based on the SOSA ontology. It is written in the Stardog Rule Syntax.
@prefix : <http://example.org/> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
IF {
?h1 <http://www.w3.org/ns/sosa/hasSimpleResult> ?r1 .
?h2 <http://www.w3.org/ns/sosa/hasSimpleResult> ?r2 .
FILTER (?r2-?r1>10) .
?h1 <http://example.com#hasEpochTime> ?t1.
?h2 <http://example.com#hasEpochTime> ?t2.
FILTER(?t1<?t2) .
}
THEN {
?h1 <http://example.com/temp_raised_high> ?t2 .
}
We observe that the pool yields nearly linear improvement of the total throughput with the number of databases (that is, 9-10 in our case). This should hold as long as the size of the pool does not exceed the number of CPUs in the system. Naturally reasoning performance also depends on complexity of the rule and any additional schema.
In this blog, we showed how the Stardog Enterprise Knowledge Graph platform could be used to perform semantic stream reasoning. The full source code of the thermostat example is available and consists of three different Docker containers:
Two docker-compose
files are available to set up both the Kafka streaming platform and to run the example.
This blogpost and the code is delivered by the IDLAB research group of Ghent Univiersty - imec from Belgium. If more advanced semantic stream reasoning is required, have a look at StreamingMASSIF.
Learn more about Stardog’s Enterprise Knowledge Graph Platform and discover how it works. If you would like to submit your research to Stardog Labs, contact us to share your project details!
When it comes to languages for querying databases, they tend to look more human-readable than a typical programming language. SPARQL, as well as SQL, employs declarative approach, allowing to describe what data needs to be retrieved without burdening the user with minutiae of how to do it. Besides being easier on the eyes, this leaves a DBMS free to choose the way it executes queries. And as is typical for database management systems, Stardog has its own internal representation for SPARQL queries: the query plan.
We are happy to announce a new feature that enhances your ability to load data into Stardog’s graph database with the release of Nifi support in v7.4.
Stardog is available for free for your academic and research projects! Get started today.
Download now