Apache Camel - basics
Apache Camel integrations are defined within your application's jvm/app-name-script-config/app-name-camel.kts file.
A simple Camel configuration
Here is a simple example. It defines a single route using a range of Camel configuration options, which we'll explore in a little more detail below:
camel {
routeHandler {
val pathStr = "${GenesisPaths.genesisHome()}/runtime/inbound/"
from("file:${pathStr}/exampledir/?move=.camel/\${date:now:yyyyMMdd-HHmmssSSS}-\${headers.CamelFileName}&initialDelay=5000&readLock=changed&readLockCheckInterval=5000&readLockTimeout=60000")
.process(fileEventProcessorProvider.createProcessor("EXAMPLE_EVENT_HANDLER", "EVENT_FILE_IMPORT_EXAMPLE", "FILE", "SOURCE_NAME"))
}
}
routeHandler
The routeHandler defines the possible routes for information to flow into and out of our system. The example above defines one route.
-
First, it defines the
pathStrusing theGenesisPathsclass to find theGENESIS_HOMEsystem environment variable. -
Next, it defines the route itself. The route in the example comes from the filesystem determined by the
file:specifier at the start of the string. This could be any Apache Camel component that can act as a consumer.
In this instance, the File component (defined in the File component documentation) can take several paths and query parameters, some of which we are using:
-
${pathStr}/exampledir/is the directoryName mandatory path parameter. It indicates where in the file system that Apache Camel should look for files. -
move=.camel/\${date:now:yyyyMMdd-HHmmssSSS}-\${headers.CamelFileName}is the move query parameter. This determines where the files found in the path directory should be moved to once they have been processed. -
initialDelay=5000is the initialDelay query parameter. This is the time in milliseconds before the system first polls the path directory. -
readLock=changedis the readLock query parameter. This sets the strategy to ensure that the file being polled isn't in use. -
readLockCheckInterval=5000is the readLockCheckInterval query parameter. This sets the time between checks of the readlock. -
readLockTimeout=60000is the readLockTimeout query parameter. This sets the maximum time that Apache Camel will wait when trying to acquire a readlock before timing out.
Both the $ signs are escaped (\$) to prevent Kotlin from injecting a variable into the string. We want these variables to be injected by Apache Camel itself rather than Kotlin.
The route then processes the file using the FileEventHandlerProcessor created by the createProcessor method through the fileEventProcessorProvider. A similar method and class exists for handling the processing of Kafka messages and CSV files.
This processing will then send an EVENT_FILE_IMPORT_EXAMPLE message to the EXAMPLE_EVENT_HANDLER process, with a dataFieldName of FILE, expecting it to be handled by an appropriate requestReply in your Request Server.
createProcessor
The createProcessor on both the FileEventProcessorProvider and KafkaEventProcessorProvider has the same parameters. However, the createProcessor of the CSVEventProcessorProvider has slightly different parameters.
The parameters for createProcessor on the FileEventProcessorProvider and KafkaEventProcessorProvider are:
-
processNameis the name of the process to which you are attempting to send a request to. -
messageTypeis the type of message sent to the above process. This is important for ensuring the correctrequestReplyin your Request Server handles this request. -
dataFieldNameis the name of the parameter that contains the data of the file or kafka events, when sent to the Request Server. -
sourceIdis the source of this request. -
replyCallbackis an optional parameter. It allows you to define a customConsumerand behaviour on the response of therequestReply. By default, without setting this parameter, a consumer is constructed that logsEVENT_NACKmessages from therequestReply, with the request set.
The parameters for createProcessor on the CSVEventProcessorProvider are:
-
processNameis the name of the process to which you are attempting to send a request to. -
messageTypeis the type of message sent to the above process. This is important for ensuring that the correctrequestReplyin your Request Server handles this request. -
sourceIdis the source of this request. -
separatorallows you to define which character is used as a separator. -
ignoreHeadersis an optional Boolean parameter, defaulting to false. If set to true, the first row of the csv will be ignored, and not parsed as part of the CSV. -
replyCallbackis an optional parameter. It allows you to define a customConsumerand behaviour on the response of therequestReply. By default, without setting this parameter, a consumer is constructed that logsEVENT_NACKmessages from therequestReply, with the request set. -
charsetis an optional parameter defaulting to UTF 8. This defines which character set the CSV is using.
If you want to ingest a CSV directly into GenesisDB rather than calling a Request Server, you might want to investigate the Genesis Data Pipeline batch pipeline first.