The workflow configurations allows fetching a database table row-by-row to load and process it chunk-wise rather than in its entirety. Thus, it is possible to regularly check a database for new data as well as processing huge amounts of data with a more economical processor utilization. In the use-case described herein, we outline how this can be done, in particular, for a MariaDB and its own syntax for queries using OFFSET. We use a dummy ordering of the rows of a database table although it would be a much better practice to have a dedicated column for this task, e.g., some autoincremented integer value or a timestamp reflecting when row was inserted into the table, since it is not guaranteed that this ordering stays unchanged over time. 

Creation of JDBC Dataset

In order to load data from a JDBC Dataset, you need to create such dataset. In the following example, we imported the product data used in this tutorial into a MariaDB instance, such that we can acces them by means of a JDBC connection (1). If you have created the dataset in your Data Manager resp. Data Integration component, you need to edit the following properties:  

The table should not be read completely but rather be queried according to an offset, such that the query strategy should refer to a given source query (2). Since this source quary has to be injected on-the-fly, you don't need to specify it. However, in order for the dataset to be completely defined, there has to be a value in (3). We have chosen "-" as this value, which will cause any workflow using this dataset to fail if no source query can be generated on-the-fly. Nevertheless, any other query could be used here and it makes sense to have a functioning query if you still need test data to create transformation tasks with this dataset as a source. Furthermore, if you want your chunks to have specific size, you need to specify a limit (4) defining this size.

Extension of JDBC Dataset Metadata

Since Data Integration does overwrite additional metadata in its project graphs, it is necessary to extend the graph to which the JDBC dataset belongs. In order to do so, you need an additional graph extending the original data graph, the creation of which can be prepared as shown in the following:

In Data Manager, the Explore View (1) has to be opened, in which you can select the respective project graph (2). In the Turtle View (3) of this graph, you can see its corresponding URI (4) that we need for the creation of the extension graph to import the original project graph. Furthermore, we need the URI of the JDBC data that can be retrieved the following way:

If you view the all datasets (1) of the project graph, you need to select the JDBC dataset that we want to extend that is called Product Data (2) in our example. In the Turtle View (3) of this dataset, you can see its corresponding URI (4) that we need to define the initial offset of the data integration workflow. Thus, we have everything needed to create the extension graph, that can be created by selecting Add new graph in the menu (5) with the URI http://di.eccenca.com/project/$projectname/extension, which is in the context of our example: http://di.eccenca.com/project/cmem/extension. If you want to, you can upload the following file that uses the URIs that we have checked in Data Manager. However, it is also possible to add the contained triples in the Turtle View of the newly created extension graph.

extension.nt

<http://di.eccenca.com/project/cmem/extension> <http://www.w3.org/2002/07/owl#imports> <http://di.eccenca.com/project/cmem> . # import the original project
<http://dataintegration.eccenca.com/cmem/Product_Data> <https://vocab.eccenca.com/di/functions/param_Jdbc_lastOffset> "0" . # set the initial offset to zero
TEXT

Dataset Reconfiguration

For the next steps, we assume that a transformation task as well as Knowledge Graph, to which the datasets content will semantically lifted, have already been created as outline in Lift data from tabular data such as CSV, XSLX or database tables, such that the following workflow can be defined:

The workflow as well as the used transformation task won't be able to run succesfully because there is valid source query that can be used to load the database content. To inject a proper query, we need to create a meta-dataset that has access to the original dataset's metadata. This dataset can be created the following way:

You have to click the Add button (1) at the Datasets node of the Data Integration Workspace. In the following dialog, you have to choose Knowledge Graph (2) as the type of the dataset, which will be created, as well as a descriptive Name (3) indicating that this dataset describes the original dataset. It also necessary to specfiy the graph in which the dataset is defined (4), i.e.  http://di.eccenca.com/project/cmem/extension in our example, and the URI of the original dataset itself (5), i.e. http://dataintegration.eccenca.com/cmem/Product_Data. Finally, the dataset will be created by clicking on Save (6).

This tutorial copes with the scenario of loading a single dataset incrementally. If you want to load more than one dataset this way, it can make more sense to create a meta-dataset that contains the project metadata rather than the metadata of a single dataset. In this case, specifying the datasets URI (5) can be seen as optional because it can be part of the definition of transformation tasks. Thus, only one meta-dataset is necessary but a transformation task has to be created for each of the original datasets, which results in less nodes in a workflow containing more than one incrementally loaded dataset. However, if each dataset has its own workflow, there are no benefits arising from this approach. You either have n meta-datasets and a single transformation task or a single project meta-dataset and n transformation tasks for n datasets that need to be loaded incrementally.

The source query is injected by a transformation task that has to be created the following way:


You have to click the Add button (1) at the Transformation tasks node of the Data Integration Workspace. In the following dialog, you have to choose a descriptive Name (2) indicating that this transformation task composes a source query. It also necessary to specfiy the source meta-dataset (3) used to retrieve the variables of the source query to be created, i.e., Product Dataset Metadata, in our example. Optionally, if you have chosen to create a dataset that includes the complete projects metadata rather than the metadata of a single project, you have to specify the dataset, you want to load incrementally by a Source Restriction (4), which is the only difference between the dataset-specific transformation task, hence, you can easily clone them after you successfully created the value formula of the first transformation. Finally, the transformation task will be created by clicking on OK (5).

The transformation task need only a single value mapping. The mapping rule and the corresponding value formula need to be created the following way:

The Target property (1) of the mapping needs to be set to sourceQuery as described in https://documentation.eccenca.com/latest/build/workflow-reconfiguration. By clicking on the Edit button (2) the editor of the formula is opened, in which it has to be specified, from which table the data is read (3), i.e. <https://vocab.eccenca.com/di/functions/param_Jdbc_table>, how many rows will be loaded (4), i.e. <https://vocab.eccenca.com/di/functions/param_Jdbc_limit>, and which will be loaded first (5), i.e. <https://vocab.eccenca.com/di/functions/param_Jdbc_lastOffset>. All components of the source query will concatenated with " " as their glue (6). If the mapping rule is defined correctly, the example data will generate the SQL query to be used for the load of the dataset (7). Finally, we can create the following workflow that will load the first 100 rows of the Product Data dataset only:

Update Offset for Next Workflow Execution

Up to now, the workflow will always load the first 100 rows of the database because the offset defined in the meta-dataset is not updated according to the number of products that have been processed during the workflow has not been updated. We use the following SPARQL Select queries to retrieve the necessary metadata:

Number of Products

SELECT (COUNT(?product) AS ?products)
WHERE{
  GRAPH <urn:product-knowledge> {
    ?product a <urn:Product>
  }
}
SQL

Last Offset

PREFIX project: <http://di.eccenca.com/project/cmem/extension>
PREFIX dataset: <http://dataintegration.eccenca.com/cmem/>
PREFIX func: <https://vocab.eccenca.com/di/functions/param_Jdbc_>

SELECT ?lastOffset
WHERE {
  GRAPH project: {
    dataset:Product_Data func:lastOffset ?lastOffset
  }
}
SQL

We will used both queries as subqueries in SPARQL Update query that calculated to update the offset. The number of products contained in the knowledge graph defines the offset to be included in the source query of the next workflow execution and the last offset has to be removed from the meta-dataset. Thus, we can create the following query and the workflow task that will use this query:

Combined Query

PREFIX project: <http://di.eccenca.com/project/cmem/extension>
PREFIX dataset: <http://dataintegration.eccenca.com/cmem/>
PREFIX func: <https://vocab.eccenca.com/di/functions/param_Jdbc_>

WITH project:
DELETE {
  dataset:Product_Data func:lastOffset ?lastOffset .
}
INSERT{
  dataset:Product_Data func:lastOffset ?newOffset .
}
WHERE {
  {
    SELECT ?lastOffset
    WHERE {
      GRAPH project: {
        dataset:Product_Data func:lastOffset ?lastOffset
      }
    }
  }
  {
    SELECT (STR(COUNT(?product)) AS ?newOffset)
    WHERE{
      GRAPH <urn:product-knowledge> {
        ?product a <urn:Product>
      }
    }
  }
}
SQL

You have to click the Add button (1) at the Others node of the Data Integration Workspace. In the following dialog, you have to choose SPARQL Update query as the tasks Type (2) a descriptive Name (3) indicating that this task updates the offset of the source query. The SPARQL update query (5) itself needs to be inserted, too. Finally, the transformation task will be created by clicking on OK (5). Once you have created this task, you can extend the workflow in the following way and, thus, write the updated offset into the meta-dataset, such that the next execution of the workflow will load the next 100 products: