DataIntegration¤
This section is intended to be a reference for all available eccenca DataIntegration configuration options.The configuration format is based on HOCON.
The following sections introduce the most important configuration parameters. The entire list of available configuration parameters can be found in the dataintegration.conf file found in the release package.
OAuth¤
Authorization in eccenca DataIntegration is based on OAuth. Typically the eccenca DataPlatform is used as an OAuth endpoint, but external endpoints can be used as well. The Authorization Code Grant workflow is used for retrieving the OAuth token. The default configuration is the following:
DataPlatform configuration¤
eccenca DataIntegration can only be run by OAuth users that are granted the urn:eccenca:di
action by the DataPlatform. An example configuration that grants all users from a predefined group dataIntegrators
access to DataIntegration is shown in the following:
In the shown configuration, the users also get access to all graphs in the RDF store. This is not a requirement for working with DataIntegration. Access may also be restricted to graphs that the data integration users are allowed to work with.
Note
The urn:elds-backend-all-actions
action set also includes the urn:eccenca:di
action, i.e., users that are granted urn:elds-backend-all-actions
also get access to eccencca DataIntegration.
In order to activate OAuth using the eccenca DataPlatform, the following minimal configuration is required:
Super User¤
By default, the workspace is loaded the first time a user opens eccenca DataIntegration in the browser using their credentials. Any scheduler that is part of a project must be started manually.
By configuring a super user, the workspace will be loaded at startup. After loading, all schedulers will be started using the credentials of the super user.
In addition to the configuration of the eccencaDataPlatform according to the previous section, a super user is configured by specifying the following two parameters:
Note
The client credentials grant type is used to retrieve a token for the super user. Note that the schedulers are only started automatically when running in production mode.
Workspace Providers¤
The backend that holds the workspace can be configured using the workspace.provider.plugin
parameter in dataintegration.conf
The following sections describe the available workspace provider plugins and how they are configured.
RDF-store Workspace - backend¤
When running in Corporate Memory, by default the workspace is held in the RDF store configured in the eccenca DataPlatform.
The workspace is held using the eccenca DataPlatform, i.e., it requires the eccencaDataPlatform.url
parameter to be configured.
This workspace can be configured using the following parameter:
Parameter | Type | Description | Default |
---|---|---|---|
loadAllVocabularyPrefixes | boolean | Load prefixes defined by all known vocabularies. | false |
loadInstalledVocabularyPrefixes | boolean | Load prefixes defined by vocabularies that are actually loaded in the RDF store. | true |
vocabularyGraph | String | The graph that contains the vocabulary meta data. | https://ns.eccenca.com/example/data/vocabs/ |
cacheDir | String | Optional directory to persist caches between restarts. If empty, caches will be held in-memory and will be reloaded on each start. | <empty> |
By default, prefixes are loaded from all installed vocabularies. Only one of loadAllVocabularyPrefixes and loadInstalledVocabularyPrefixes can be set to true.
The corresponding configuration in your dataintegration.conf
looks like the following:
eccencaDataPlatform.url = <DATAPLATFORM_URL>
...
workspace.provider.plugin = backend
...
workspace.provider.backend = {
# Load prefixes defined by all known vocabularies.
loadAllVocabularyPrefixes = false
# Load prefixes defined by vocabularies that are actually loaded in the RDF store.
loadInstalledVocabularyPrefixes = true
# The graph that contains the vocabulary meta data.
vocabularyGraph = "https://ns.eccenca.com/example/data/vocabs/"
# Optional directory to persist caches between restarts. If empty, caches will be held in-memory and will be reloaded on each start.
cacheDir = ""
}
File-based Workspace - file¤
The workspace can also be held on the filesystem.
This workspace can be configured using the following parameter:
Parameter | Type | Description | Default |
---|---|---|---|
dir | String | The directory to which the workspace is persisted. | no default |
The corresponding configuration in your dataintegration.conf
looks like the following:
Hybrid workspace - fileAndDataPlatform¤
The so called hybrid workspace holds the workspace in the filesystem and in eccenca DataPlatform simultaneously. Each time a task is updated, it is written to both the filesystem and to the project RDF graph. In addition, on each (re)load of the Workspace, the contents of the file based workspace are pushed to the RDF store, to make sure that both workspace backends stay synchronized. Contents of the file system may be changed manually (reload the workspace afterwards). Contents of the RDF store are supposed to be read only and will be overwritten on reload.
The workspace is held using the eccenca DataPlatform, i.e., it requires the eccencaDataPlatform.url
parameter to be configured.
This workspace can be configured using the following parameter:
Parameter | Type | Description | Default |
---|---|---|---|
dir | String | The directory to which the workspace is persisted. | no default |
loadAllVocabularyPrefixes | boolean | Load prefixes defined by all known vocabularies. | false |
loadInstalledVocabularyPrefixes | boolean | Load prefixes defined by vocabularies that are actually loaded in the RDF store. | true |
vocabularyGraph | String | The graph that contains the vocabulary meta data. | https://ns.eccenca.com/example/data/vocabs/ |
failOnDataPlatformError | boolean | If true, whenever an update is triggered that has been pushed to the xml backend, but failed to be pushed to the DataPlatform, the entire request fails. If false, an update error in the DataPlatform will only log a warning. | false |
By default, prefixes are loaded from all installed vocabularies. Only one of loadAllVocabularyPrefixes and loadInstalledVocabularyPrefixes can be set to true.
The corresponding configuration in your dataintegration.conf
looks like the following:
In-memory Workspace - inMemory¤
A workspace provider that holds all projects in memory. All contents will be gone on restart.
The corresponding configuration in your dataintegration.conf
looks like the following:
In-memory RDF Workspace - inMemoryRdfWorkspace¤
A workspace that is held in a in-memory RDF store and loses all its content on restart (mainly used for testing). Needed if operators are used that require an RDF store backend.
This workspace can be configured using the following parameter:
Parameter | Type | Description | Default |
---|---|---|---|
loadAllVocabularyPrefixes | boolean | Load prefixes defined by all known vocabularies. | false |
loadInstalledVocabularyPrefixes | boolean | Load prefixes defined by vocabularies that are actually loaded in the RDF store. | true |
vocabularyGraph | String | The graph that contains the vocabulary meta data. | https://ns.eccenca.com/example/data/vocabs/ |
By default, prefixes are loaded from all installed vocabularies. Only one of loadAllVocabularyPrefixes and loadInstalledVocabularyPrefixes can be set to true.
The corresponding configuration in your dataintegration.conf
looks like the following:
Resource Repositories¤
Project resources are held by a resource repository which is configured using the workspace.repository.plugin
parameter in dataintegration.conf
The following sections describe the available resource repository plugins and how they are configured.
Project Specific Directories - projectFile¤
By default, resources are held in project specific directories.
This plugin can be configured using the following parameter:
Parameter | Type | Description | Default |
---|---|---|---|
dir | String | The directory to which the resources are persisted. | no default |
The corresponding configuration in your dataintegration.conf
looks like the following:
Shared Directory - file¤
Alternatively, all resources across all DataIntegration projects can be held in a single directory on the file system.
This plugin can be configured using the following parameter:
Parameter | Type | Description | Default |
---|---|---|---|
dir | String | The directory to which the resources are persisted. | no default |
The corresponding configuration in your dataintegration.conf
looks like the following:
HDFS resources - hdfs¤
Holds all resources on the HDFS file system.
This plugin can be configured using the following parameter:
Parameter | Type | Description | Default |
---|---|---|---|
path | String | The directory to which the resources are persisted. | no default |
user | String | The hadoop user. | hadoopuser |
The corresponding configuration in your dataintegration.conf
looks like the following:
S3 Bucket, Project Specific - projectS3¤
In addition to storing files in the local filesystem an AWS S3 bucket can be used as the resource repository backend.
To use resources stored on S3 the AWS keyID
, secretKey
need to be configured in the dataintegration.conf
file. This and the region are used to connect to S3. Further, one bucket name has to be given. This is analog to the root folder of filesystem based resource repositories.
This plugin can be configured using the following parameter:
Parameter | Type | Description | Default |
---|---|---|---|
bucket | String | The S3 bucket name. | no default |
accessKeyId | String | The S3 access key ID. | no default |
secretKey | String | The S3 secret key. | no default |
region | String | The AWS region the S3 bucket is located in. | no default |
path | String | OPTIONAL. Path (absolute to the bucket (root)) that defines the folder that will be used to hold the workspace. | no default |
The corresponding configuration in your dataintegration.conf
looks like the following:
S3 Bucket, Shared Directory - s3¤
Holds all resources shared across all your DataIntegration projects in a single S3 bucket.
The available configuration options are the same as for projectS3
.
The corresponding configuration in your dataintegration.conf
looks like the following:
In-Memory - inMemory¤
Resources can also be held in-memory. The corresponding configuration in your dataintegration.conf
looks like the following:
Note
In-memory repositories will be emptied on restart.
No resources - empty¤
In case you do not want to allow to store file resources you can define an empty repository. The empty
resource repository that does not allow storing any resources.
The resource repository can also be specified as empty. The corresponding configuration in your dataintegration.conf
looks like the following:
Execution Report Manager¤
The execution report manager is used to persist execution reports. It allows to retrieve previous reports. you can use it with file and in-memory models. In addition you can specify a retention time. Reports older than this time will be deleted, if a new report is added. The retention time is expressed as a Java Duration
string, see https://docs.oracle.com/javase/8/docs/api/java/time/Duration.html#parse-java.lang.CharSequence- for details.
Disabled - None¤
Discards execution reports and does not persist them.
In-Memory - inMemory¤
Holds the reports in memory.
File based - file¤
Holds the reports in a specified directory on the filesystem.
Internal Datasets¤
Internal datasets hold intermediate results during the execution of a DataIntegration Workflow. The type of internal dataset that is used can be configured. By default an in memory dataset is used for storing data between tasks:
Alternatively, the internal data can also be held in the eccenca DataPlatform:
If the eccenca DataPlatform is not available, an external store may also be specified:
Warning
If an RDF store based internal dataset is used, all internal data is stored in the same graph. For this reason, multiple different internal datasets cannot be used safely in that case.
Timeouts¤
DataIntegration has a number of timeouts to maintain operation while connection issues or problems in other applications occur. The following sections list the most important global timeouts.
Note
In addition to these global timeouts, many datasets, such as the Knowledge Graph dataset, do provide additional timeout parameters. Refer to the documentation of datasets for individual timeout mechanisms of different datasets.
Warning
All timeouts set need to be lower than the gateway timeout in the network infrastructure.
Request timeout¤
The request timeout specifies how long a HTTP(S) request may take until it times out and is closed:
This timeout mechanism can be disabled, by setting the timeout to "infinite"
.
Idle request timeout¤
The idle timeout specifies the maximum inactivity time of a HTTP(S) connection:
The connection will be closed after it has been open for the configured idle timeout, without any request or response being written. This timeout mechanism can be disabled, by setting the timeout to "infinite"
.
DataPlatform timeouts¤
As DataIntegration depends on DataPlatform for managing the Knowledge Graph, it will query the health of DataPlatform as part of its own health check. The timeout to wait for DataPlatform’s response to a health request can be configured:
In addition, there is a timeout when requesting authorization information from the eccenca DataPlatform, which is fixed to 61 seconds.
RDF store timeouts¤
When reading and writing RDF there are a number of timeouts applied, depending on whether the Graph Store protocol or the SPARQL endpoint are used.
For the Graph Store protocol, the following timeouts can be configured:
For the SPARQL endpoint, the following parameters are applicable:
Note
The Knowledge Graph dataset provides additional timeout parameters for more fine-grained control.
Linking execution timeout¤
When executing linking rules there are a number of timeouts to prevent possibly erroneous linkage rules from generating too many links or staling the execution:
Maximum Upload Size¤
By default, the size of uploaded resources is limited to 10 MB. The upload limit can be increased:
While uploading, resources are cached on the disk, i.e., the limit may exceed the size of the available memory.
Provenance¤
By default, no provenance data is written to the RDF store. To enable writing provenance data to a named graph, a provenance plugin needs to be configured.
Provenance output can be configured using the following parameter:
Parameter | Type | Description | Default |
---|---|---|---|
provenance.graph | String | Set the graph where generated provenance will be written to in the RDF workspace provider. | https://ns.eccenca.com/example/data/dataset/ |
provenance.persistWorkflowProvenancePlugin.plugin | String | Provenance plugin to set where provenance data should be written to. Possible options: - rdfWorkflowProvenance - writes provenance data to RDF backend - nopWorkflowProvenance - do NOT write provenance data (disable it) | nopWorkflowProvenance |
To enable provenance output, the following lines can be added to dataintegration.conf
:
Logging¤
Logging for eccenca DataIntegration is based on the Logback logging framework. There are two ways to change the logging behavior from the default, the first is to provide a logback.xml file, the second is to set various logging properties in the dataintegration.conf file.
Logback Configuration File¤
The logback.xml
file can be added to the ${ELDS_HOME}/etc/dataintegration/conf/
folder, from where it is read on application start-up and replaces the default logging config.
Configuration Example¤
The following example logback.xml
file defines a rolling file strategy where files are rotated on a time base (1 day) with a limit of 7 files, which means that the logging files contain a log history of a maximum of 1 week.
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="TIME_BASED_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>/opt/elds/var/log/dataintegration.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- daily rollover, history for 1 week -->
<fileNamePattern>/opt/elds/var/log/dataintegration.%d{yyyy-MM-dd}.log</fileNamePattern>
<maxHistory>7</maxHistory>
</rollingPolicy>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<logger name="com.eccenca" level="INFO">
<appender-ref ref="TIME_BASED_FILE" />
</logger>
</configuration>
Logging Properties¤
For debugging purposes and smaller adaptions it is possible to change log levels for any logger in the DataIntegration config file. There are following possibilities:
Plugin Configuration¤
The plugin architecture of eccenca DataIntegration allows to configure certain characteristics of the application, e.g. the persistence backend for the workspace.\ A full list over all plugins are given in the eccenca DataIntegration user manual in the sections Plugin Reference as well as Activity Reference.
Blacklisting Plugins¤
In some cases the usage of specific plugins might pose a risk. In order to avoid to load a specific plugin, it can be blacklisted in the configuration file by setting the pluginRegistry.plugins.{pluginID}.enabled
config parameter to false. The parameter takes a comma-separated list of plugin IDs. The corresponding plugins will not be loaded into the plugin registry and can thus not be selected or executed anymore. The plugin ID for each plugin can be found in the Plugin Reference and Activity Reference section of the eccenca DataIntegration user manual.
Example config:
Spark configuration¤
The following chapters describe configuration options relevant for the execution of eccenca DataIntegration workflows on Spark. All options regarding the execution of DataIntegration on Spark are set in the dataintegration.conf
file. The option to define SparkExecutor as the execution engine is execution.manager.plugin
and needs to be changed from the default value:
To the value that specifies the use of the SparkExecutor:
Execution in Spark client Mode¤
Spark provides a simple standalone cluster manager. You can launch a standalone cluster either manually, by starting a master and workers by hand, or by using the provided launch scripts. Spark can still run alongside Hive, Hadoop and other services in this mode. But in general this mode is preferred if only Spark applications are running in a cluster. When multiple cluster applications are running in parallel (e.g. different databases, interpreters or any software running on top of Yarn) or more advanced monitoring is needed the execution with Yarn is often recommended.
For running DataIntegration in client mode the following configuration can be used
Execution in cluster mode¤
DataIntegration application configuration¤
Cluster mode is supported with Apache Yarn only at the moment. To run DataIntegration in cluster mode the following configuration can be used:
DataIntegration cluster deployment configuration¤
In cluster mode one should keep in mind that, normally, DataIntegration will generate a jar and a project export. These artifacts can be copied or send to a cluster and will be executed there via the spark-submit
command. That means the data processing is running in its own remote process separate from the DataIntegration application.
An assembly jar and a workflow can be exported by an activity that belongs to each defined workflow. The activity can be configured in the dataintegration.conf
. There are 3 phases of a deployment: staging, transform, loading and 3 types of artifact compositions as well as some other options deciding the target of the export. First the specified resource are copied to the configured resource folder of a project or a temp folder (staging) and then an action decides how the files are deployed.
Artifacts (spark.deployment.options.artifact):
- ‘jar’: The assembly jar is deployed
- ‘project’: The exported project zip file is deployed (e.g. if the assembly was already globally deployed)
- ‘project-jar’: The jar and the project zip are deployed
The artifacts are copied to the configured resource folder off the project the activity belongs to.
Types (spark.deployment.options.[phase].type, e.g. spark.deployment.options.staging.type="env-script"
):
- ‘script’ A shell script is called to copy the files to the cluster (can be user supplied, contain auth, prepare a DataFactory activity etc.)
- ‘copy’ The resource are copied to a specified folder
- ‘hdfs’ The resource is imported to HDFS
- ‘env-script’ A shell script is loaded from a environment variable
- ‘var-script’ A shell script is loaded from a configuration variable
Other options:
spark.deployment.options.[phase].[typeName]
Depending on the selected deployment type this contains one or more (separated by a comma) targeted local file system or HDFS paths or location of the scripts to run\ e.g.spark.deployment.options.staging.type ="script"
andspark.deployment.options.staging.script="/scripts/script.sh"
spark.deployment.options.overwriteExecution
Boolean value that decides if the ExecuteSparkWorkflow action is overwritten by the deployment action and will run this instead.
Example:
Activity parameters to skip deployment phases¤
In some scenarios (especially deployments where a jar has to be copied to a remote location) it is required that a deployment phase can be skipped. E.g. the jar upload only has to be done once, the upload is defined in the “staging” phase and the spark-submit call in the “transform” phase. The parameter “executeTransform” (reachable via the activity tab) can\ be set to false on the second run to avoid re-uploading artifacts.
Configuration of the assembly jar¤
In cluster mode, usually, we run a Spark Job by submitting an assembly jar to the cluster. This can be seen a command line version of DataIntegration and can also be used manually with ‘spark-submit’. In this case the configuration in the environment the jar runs in should look like this (options are set by the spark-submit configuration and parameters):
Execution in local mode¤
Local mode is mainly for testing, but can be used for deployment on a single server. HDFS and Hive are not required. The following configuration parameters have to be set:
In this mode the parameters and Spark settings appended to the ‘spark-submit’ command will always be used and overwrite configuration settings in other sources.
Configuration and usage of the SqlEndpoint dataset¤
Server Side Configuration¤
General Settings¤
The SqlEndpoint dataset is a table or view behaving analog to a table in a relational database like MySQL. When data is written to an SqlEndpoint dataset, a JDBC server is started and can be queried with any JDBC client. In DataIntegration the SqlEndpoint dataset behaves like any other dataset. It can be used as a target for workflows, be profiled, used as a source for an operation or workflow etc. There are a two of configuration options that are relevant for the JDBC endpoints:
The port on which the JDBC connections will be available is 10005
by default and can be changed in the hive-site.xml
and spark-defaults.conf
configuration files.
Security Settings¤
A secure connection can be configured with the authentification settings in the hive-site.xml
, spark-defaults.conf
and dataintegration.conf
files.
If Hive support is disabled ( enableHiveSupport = false
) or if the property hive.server2.authentication
has the value None
security can be disabled.
There exist a number of option for secure JDBC connections via Thrift and Hive:
- Kerberos
- LDAP
- Custom authentication classes
- User impersonation
- Server and Client Certificates
Eccenca provides a custom Authentification provider which allows to set 1 user/password combination for JDBC connections via:
The authentication provider class name is com.eccenca.di.sql.endpoint.security.SqlEndpointAuth
. To use it the following configuration is needed:
<configuration>
<property>
<name>hive.server2.authentication</name>
<value>CUSTOM</value>
</property>
<property>
<name>hive.server2.custom.authentication.class</name>
<value>com.eccenca.di.sql.endpoint.security.SqlEndpointAuth</value>
<description>
Custom authentication class. Used when property
'hive.server2.authentication' is set to 'CUSTOM'. Provided class
must be a proper implementation of the interface
org.apache.hive.service.auth.PasswdAuthenticationProvider. HiveServer2
will call its Authenticate(user, password) method to authenticate requests.
The implementation may optionally implement Hadoop's
org.apache.hadoop.conf.Configurable class to grab Hive's Configuration object.
</description>
</property>
...
</configuration>
Check the Hive documentation for details: Hive admin manual or the documentation of a Hadoop Distribution (MapR, Hortenworks or AWS and Azure in he cloud etc.). Hadoop distributions usually provides instructions for configuring secure endpoints.
Integration with various authentication providers can be configured and is mostly set up in hive-site.xml
.
SqlEndpoint Dataset Parameters¤
The dataset only requires that the tableNamePrefix
parameters is given. This will be used as the prefix for the names of the generated tables. When a set of Entities is written to the endpoint a view is generated for each entity type (defined by an rdf_type
attribute). That means that the mapping or data source that are used as input for the SqlEndpoint need to have a type or require a user defined type mapping.
The operator has a compatibility mode. Using it will avoid complex types such as Arrays. When arrays exit in the input they are converted to a String using the given arraySeperator
.
SqlEndpoint Activity¤
The activity will start automatically, when the SqlEndpoint is used as a data sink and Dataintegration is configured to make the SqlEndpoint accessible remotely.
When the activity is started and running it returns the server status and JDBC Url as its value.
Stopping the activity will drop all views generated by the activity. It can be restarted by rerunning the workflow containing it as a sink.
Remote Client Configuration (via JDBC and ODBC)¤
Within Dataintegration the SqlEndpoint can be used as a source or sink like any other dataset. If the startThriftServer option is set to true
access via JDBC or ODBC is possible.
ODBC and JDBC drivers can be used to connect to relational databases. These drivers are used by clients like, Excel, PowerBI or other BI tools and transform standard SQL-queries to Hive-QL queries and handle the respective query results. Hive-QL support a subset of the SQL-92 standard. Depending on the complexity of the driver it – in case of a simple driver – supports the same subset or more modern standards. JDBC drivers are similar to ODBC ones, but serve as connectors for Java applications. When selecting a version of a driver the client operating system and its type (32bit/64 bit) are the most important factors. The version of the client drivers sometimes is the same as the servers. When no version of a driver is given the newest driver of the vendor should work, as it should be backwards compatible.
Any JDBC or ODBC client can connect to a JDBC endpoint provided by an SqlEndpoint dataset. SqlEndpoint uses the same query processing as Hive, therefore the requirements for the client are:
- A JDBC driver compatible with Hive 1.2.1 (platform independent driver org.apache.hive.jdbc.HiveDriver is needed) or
- Hive 1.2.1 is ODPi runtime compliant
- A JDBC driver compatible with Spark 2.3.3
- A Hive ODBC driver (ODBC driver for the client architecture and operating system needed)
A detailed instruction to connect to a Hive or SqlEndpoint endpoint with various tools (e.g. SQuirreL, beeline, SQL Developer, …) can be found at Apache HiveServer2 Clients.\ The multi platform database client DBeaver can connect to the SQLEndpoint out of the box.
Partitioning and merging of data sets¤
The execution on Spark is possible independent of the used file system as long as it can be referenced/is accessible for all cluster nodes. HDFS is recommended and the default settings are recommended for the best performance on a small cluster. Especially when working in local mode on the local file systems some problems can occur with the parallelism settings of Spark and the resulting partitioned output resources.
Problems can be avoided by changing the following are the default settings:
When running only locally the configuration should be like the following example (especially combineOutput
has to be true):