Showing results for

Back to all articles

Stream Reasoning With Stardog

Guest author Bram Steenwinckel Mar 18, 2020

Guest author Bram Steenwinckel describes how to perform semantic reasoning over streaming data in your knowledge graph.

This blogpost and the code is delivered by the IDLAB research group of Ghent University - imec from Belgium.

Semantic reasoning over streaming data

Semantic reasoning with streaming data does not necessarily require specialised formalisms and tools. You can perform stream reasoning using Stardog with a relatively simple Python extension, which I’ll demonstrate below.

Industry 4.0: Hello data

Companies around the world are connecting their equipment, machinery and even people to the worldwide web. The Internet of Things (IoT for short) defines these connected things as smart devices, all of which are producing floods of data.

This data has become a target for business intelligence and analytics to generate important business insights. However the raw data is often too opaque to process thus it has become a popular approach to annotate it with metadata and integrate into a knowledge graph (possibly after intermediate filtering and aggregations). Semantically enriched graph data can be stored in Stardog for subsequent analytics in the form of logical or statistical reasoning.

There are however important practical cases where analysing static data is less of interest because one wants to react upon the behavioural changes seen in the data directly.

Take, for example, a smart thermostat installed at your home, which has a temperature sensor producing observations. When the temperature is getting low, we want our heating devices to be switched on. If we first offload all the data acquired during the day, one can create an efficient heat plan for the next days. But this approach is not very reactive or adaptive. We want our system to analyse the data in nearly real-time and take actions when necessary.

Reacting or analysing the data when it occurs may sound simple in the context of the above smart thermostat example. But such a paradigm offers companies a wide range of exciting use cases, such as predictive maintenance.

Adding semantics to the stream

Instead of semantically enriching the data when (or after) it hits the database, we could also semantify the data when it leaves the sensor. Raw observations become semantic observations being streamed.

Most of the time, these semantic observations have additional metadata, specifying the generating sensors, the operating behaviour or other characteristics defined by a semantic model or ontology. Some great tools to start semantifying your raw sensor data, or data in general, can be found online.

Once semantically enriched data is streaming, one can ask if it is possible to perform reasoning on the fly without storing everything in a single, global knowledge graph upfront. One example would be applying reasoning rules encoding important business logic.

The answer is yes, it is possible with Stardog as the following example will demonstrate.

Use case: a smart thermostat

Let’s get deeper into the smart thermostat example. The diagram below shows the different steps of rule-based reasoning in Stardog over a stream of temperature observations. alt text

First, the semantic data from our thermostat flows into a stream processing platform such as Apache Kafka. Next, a Kafka consumer loads all the observations into a Stardog database. The number of loaded observations (or the window) depends on the use case. Afterwards, the database starts reasoning on the given data, taking into account the predefined rules and the ontology. Both the ontology and rules are pre-loaded when the database for the window of observations is provisioned. Finally the inferences are pushed back on the streaming platform using a Kafka producer. As the results are also being streamed, other operations can depend or take actions from it.

If we want, for example, to determine a rise in temperature in a streaming fashion:

  • We can load the semantic temperature observations into our stream processing platform.

  • Semantic data is loaded into a Stardog database using a sliding window containing two observations.

  • A simple semantic rule is checking whether the temperature value of the first observations is sufficiently lower than the second one.

  • The result is pushed back to the streaming platform.

It’s now time for some technical details of these processing steps.

Consuming semantic streams with Kafka

We use a simple python script to build a Kafka consumer and read the observations from the streaming platform.

def _initialise():
  consumer = KafkaConsumer(
      SEMANTIC_EVENTS_TOPIC,
      bootstrap_servers=KAFKA_BROKER_URL,
      value_deserializer=lambda value: json.loads(value),
  )
  return consumer

if __name__ == '__main__':
  consumer = _initialise()
  ...
  for message in consumer:
    append_message(message, 'thermostat','temperature')
    #DO something

As discussed, we will analyse the semantic observations in a sliding window of 2:

def append_message(message, box, metric):
    if len(message_queue[box][metric][-1]) < 2:
        #There is some space left in our window
        window_queue[box][metric][-1].append(message)
    else:
        # take the last message from the previous window
        p_message = message_queue[box][metric][-1][1:]

        # add a new window to our window queue
        window_queue[box][metric].append([])

        # add the last known message
        window_queue[box][metric][-1].extend(p_message)

        # add the current message
        window_queue[box][metric][-1].append((message, time))

We use a window queue to keep track of all the sliding windows. As data can be streamed in a rather high rate, we must ensure the windows are analysed in order. The next step is to load the window into a Stardog database to start the reasoning process.

Provisioning Stardog databases for stream reasoning

Before we can start reasoning, we should launch Stardog and get the data into a database. The naive approach would be create a fresh database for each window. That’d severely impact latency and throughput of the platform since input rate can be high but database initialisation is a rather heavyweight operation (it needs to set up the storage backend with all indices and data structures for transactions, and in addition pre-process the rules). Instead we initialise a fixed-size pool of Stardog databases each containing the rule (and, if needed, any additional ontology) and ready for use.

# Multiprocessing manager used to keep track of all databases
manager = Manager()
db_pool = manager.list([True for x in range(0, DATABASES)])

# create database pool with 10 databases
for i in range(0, 10):
  try:
    with stardog.Admin(**conn_details) as admin:
        admin.database('db'+str(i)).drop()
  except:
    print("no database to drop")

  with stardog.Admin(**conn_details) as admin:
      admin.new_database('db'+str(i))
      with stardog.Connection('db'+str(i), **conn_details) as conn:
          conn.begin()
          # preload the rule
          conn.add(stardog.content.File('rule.ttl'))
          conn.commit()

This setup enables very fast provisioning of databases when data arrives. In addition, the pool supports distributing the workload over multiple databases and reasoning on multiple windows in parallel which further boosts the throughput of the system.

def reason_window(message_queue, box):
  # get an available database (check if there are available databases is performed upfront)
  db = db_pool.index(True)
  # set database to used
  db_pool[db] = False
  # Start separate process
  p = Process(target=reason_window, args=(message_group, db, box))
  p.start()
  return p, db

def reason_window(message_queue, db_i, box):
  # make connection with database
  try:
    with stardog.Connection('db'+str(db_i), **conn_details) as conn:
      # start reasoning
      reason(conn, message_group, db_i, box)
  except Exception as e:
      print(e)

def reason(conn, message_queue, db_i, box):
  # get window from message group
  message = message_group.pop()
  # add message to database
  conn.add(stardog.content.Raw(message))

  # Start reasoning
  try:
    conn.begin(reasoning=True)
    res = conn.select("""
      ASK {
        # rule specific content
        ?anomaly1 <http://example.com/temp_raised_high> ?type1 .
      }
    """, reasoning=True)

    # check result and report
    if res['boolean']:
        report_anomaly(message, box, conn)

  except Exception as e:
    print(e)

  # clean database before accepting new window
  conn.rollback()
  conn.begin()
  conn.remove(stardog.content.Raw(message))
  conn.commit()

The rule used in our example is rather simple and based on the SOSA ontology. It is written in the Stardog Rule Syntax.

@prefix : <http://example.org/> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .

IF {
    ?h1 <http://www.w3.org/ns/sosa/hasSimpleResult> ?r1 .
    ?h2 <http://www.w3.org/ns/sosa/hasSimpleResult> ?r2 .
    FILTER (?r2-?r1>10) .
    ?h1 <http://example.com#hasEpochTime> ?t1.
    ?h2 <http://example.com#hasEpochTime> ?t2.
    FILTER(?t1<?t2) .
}
THEN {
   ?h1 <http://example.com/temp_raised_high> ?t2 .
}

We observe that the pool yields nearly linear improvement of the total throughput with the number of databases (that is, 9-10 in our case). This should hold as long as the size of the pool does not exceed the number of CPUs in the system. Naturally reasoning performance also depends on complexity of the rule and any additional schema.

Try it out!

In this blog, we showed how the Stardog Enterprise Knowledge Graph platform could be used to perform semantic stream reasoning. The full source code of the thermostat example is available and consists of three different Docker containers:

  1. is set up to produce some fake semantic observations.
  2. a Docker container to setup Stardog and
  3. the described reasoning, windowing and pooling Stardog consumer.

Two docker-compose files are available to set up both the Kafka streaming platform and to run the example.

This blogpost and the code is delivered by the IDLAB research group of Ghent Univiersty - imec from Belgium. If more advanced semantic stream reasoning is required, have a look at StreamingMASSIF.

References


Learn more about Stardog’s Enterprise Knowledge Graph Platform and discover how it works. If you would like to submit your research to Stardog Labs, contact us to share your project details!

Keep Reading:

Introducing Plan Endpoint

When it comes to languages for querying databases, they tend to look more human-readable than a typical programming language. SPARQL, as well as SQL, employs declarative approach, allowing to describe what data needs to be retrieved without burdening the user with minutiae of how to do it. Besides being easier on the eyes, this leaves a DBMS free to choose the way it executes queries. And as is typical for database management systems, Stardog has its own internal representation for SPARQL queries: the query plan.

Stardog Data Flow Automation with NiFi

We are happy to announce a new feature that enhances your ability to load data into Stardog’s graph database with the release of Nifi support in v7.4.

Try Stardog Free

Stardog is available for free for your academic and research projects! Get started today.

Download now