Skip to content

Commit

Permalink
[BXMSDOC-7030] AMQ Streams integration doc - 7.48.x (#3189)
Browse files Browse the repository at this point in the history
* AMQ Streams intergration doc - initial (#132)

* AMQ Streams intergration doc - initial

* creating new message/signal

* Configuring the KIE server

* Emitter and some work on custom task

* Community build

* Custom Task review, started Peer review

* Peer review and final clarifications for Kafka integration

* COrrect KAFKA_PRODUCT attribute
  • Loading branch information
mramendi authored Jan 25, 2021
1 parent 69af8fc commit 14d0c6d
Show file tree
Hide file tree
Showing 16 changed files with 215 additions and 0 deletions.
2 changes: 2 additions & 0 deletions _artifacts/document-attributes-dm.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,6 @@
:CENTRAL_CAPITAL_UNDER: DECISION_CENTRAL
:CENTRAL_ONEWORD: decisioncentral

:KAFKA_PRODUCT: Red Hat AMQ Streams

:CONTACT: {PRODUCT} documentation team: [email protected]
2 changes: 2 additions & 0 deletions _artifacts/document-attributes-drools.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,5 @@
:URL_COMPONENT_CENTRAL: business-central
:CENTRAL_CAPITAL_UNDER: BUSINESS_CENTRAL
:CENTRAL_ONEWORD: businesscentral

:KAFKA_PRODUCT: Apache Kafka
2 changes: 2 additions & 0 deletions _artifacts/document-attributes-jbpm.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,5 @@
:URL_COMPONENT_CENTRAL: business-central
:CENTRAL_CAPITAL_UNDER: BUSINESS_CENTRAL
:CENTRAL_ONEWORD: businesscentral

:KAFKA_PRODUCT: Apache Kafka
2 changes: 2 additions & 0 deletions _artifacts/document-attributes-op.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,5 @@
:URL_COMPONENT_CENTRAL: business-central
:CENTRAL_CAPITAL_UNDER: BUSINESS_CENTRAL
:CENTRAL_ONEWORD: businesscentral

:KAFKA_PRODUCT: Apache Kafka
2 changes: 2 additions & 0 deletions _artifacts/document-attributes-pam.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,6 @@
:CENTRAL_CAPITAL_UNDER: BUSINESS_CENTRAL
:CENTRAL_ONEWORD: businesscentral

:KAFKA_PRODUCT: Red Hat AMQ Streams

:CONTACT: {PRODUCT} documentation team: [email protected]
2 changes: 2 additions & 0 deletions _artifacts/document-attributes.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ endif::OP[]
:CODEREADY_STUDIO: Red Hat CodeReady Studio
:CODEREADY_STUDIO_VERSION: 12.17
:CODEREADY_STUDIO_VERSION_LONG: 12.17.0
:AMQ_VERSION: 7.7
:DATAGRID: Red Hat Data Grid
:DATAGRID_VERSION: 7.3
:DATAGRID_VERSION_LONG: 7.3.6
Expand Down Expand Up @@ -149,6 +150,7 @@ endif::OP[]
:INTEGRATING_ENTANDO: Integrating {PRODUCT} assets in Entando AppBuilder
:INTEGRATING_SSO: Integrating {PRODUCT} with Red Hat Single Sign-On
:INTEGRATING_FUSE: Integrating {FUSE_LONG} with {PRODUCT}
:INTEGRATING_AMQ_STREAMS: Integrating {PRODUCT} with {KAFKA_PRODUCT}
:PLANNING_INSTALL: Planning a {PRODUCT} installation
:PATCHING_UPGRADING: Patching and upgrading {PRODUCT} {PRODUCT_VERSION}

Expand Down
24 changes: 24 additions & 0 deletions assemblies/assembly-integrating-amq-streams.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[id='assembly-integrating-amq-streams']
= Integrating {PRODUCT} with {KAFKA_PRODUCT}
ifdef::context[:parent-context: {context}]
// Context attribute is assembly specific and enables module reuse between assemblies.
:context: integrating-amq-streams

// Purpose statement for the assembly
As a developer, you can integrate {PRODUCT} with Red Hat AMQ Streams or Apache Kafka. A business process can send and receive Kafka messages.

include::{jbpm-dir}/Kafka/integration-kafka-con.adoc[leveloffset=+1]
include::{jbpm-dir}/Kafka/message-receive-event-proc.adoc[leveloffset=+2]
include::{jbpm-dir}/Kafka/message-send-event-proc.adoc[leveloffset=+2]
include::{jbpm-dir}/Kafka/message-customtask-proc.adoc[leveloffset=+2]
include::{jbpm-dir}/Kafka/kieserver-kafka-proc.adoc[leveloffset=+1]
include::{jbpm-dir}/Kafka/kieserver-kafka-emit-proc.adoc[leveloffset=+1]


== Additional Resources

* https://access.redhat.com/documentation/en-us/red_hat_amq/{AMQ_VERSION}/html/using_amq_streams_on_openshift/[_Using AMQ Streams on OpenShift_]
* https://access.redhat.com/documentation/en-us/red_hat_amq/{AMQ_VERSION}/html/using_amq_streams_on_rhel/[_Using AMQ Streams on RHEL_]

ifdef::parent-context[:context: {parent-context}]
ifndef::parent-context[:!context:]
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[id='integration-kafka-con_{context}']
= Kafka messages in a business process

ifdef::JBPM,DROOLS,OP[]
Apache Kafka
endif::JBPM,DROOLS,OP[]
ifdef::PAM,DM[]
{KAFKA_PRODUCT}, based on Apache Kafka,
endif::PAM,DM[]
is a streaming platform. It passes messages, sorted into topics, between applications in a software environment.

You can create business processes using {PRODUCT} that send and receive Kafka messages in the following ways:

* Create a start event or intermediate catch event of the type _message_ or _signal_. The {KIE_SERVER} automatically subsribes to the Kafka topic that is defined in the message or signal. A message triggers the event. The event can pass the content of the message to the subsequent node in the process.

* Create an end event or intermediate throw event of the type _message_ or _signal_. When the process triggers the event, the {KIE_SERVER} sends a Kafka message in the topic that is defined in the message or signal. The message contains the data that is configured in the event.

* Add the `KafkaPublishMessages` custom task to the process. This task does not require the {KIE_SERVER} Kafka capability but is significantly more complicated to configure than signal or message events.

* Configure your service and the {KIE_SERVER} to emit Kafka messages about every completed process, case, and task when transactions are committed.

Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
[id='kieserver-kafka-emit-proc_{context}']
= Configuring a service and the {KIE_SERVER} to emit Kafka messages when a transaction is committed

You can configure the {KIE_SERVER} to emit Kafka messages about every event when a process, case, or task is completed. The {KIE_SERVER} sends the messages when it commits transactions.

You can use this functionality with any business process or case. You do not need to change anything in the process design.

This configuration is also available if you run your process service using SpringBoot.

By default, the {KIE_SERVER} publishes the messages in the following topics:

* `jbpm-processes-events` for messages about completed processes
* `jbpm-tasks-events` for messages about completed tasks
* `jbpm-cases-events` for messages about completed cases

You can configure the topic names.

The published messages comply with the https://github.com/cloudevents/spec[CloudEvents specification] version 1.0. Each message contains the following fields:

* `id`: The unique identifier of the event
* `type`: The type of the event (process, task, or case)
* `source`: The event source as a URI
* `time`: The timestamp of the event, by default in the https://tools.ietf.org/html/rfc3339[RFC3339] format
* `data`: Information about the process, case, or task, presented in a JSON format

.Procedure

. To enable emitting Kafka messages, complete one of the following steps:
.. If you deployed the {KIE_SERVER} on {EAP} or another application server:
ifdef::PAM,DM[]
... Download the `{PRODUCT_FILE}-maven-repository.zip` product deliverable file from the {PRODUCT_DOWNLOAD_LINK}[Software Downloads] page of the Red Hat Customer Portal.
... Extract the contents of the file.
... Copy the `maven-repository/org/jbpm/jbpm-event-emitters-kafka/{MAVEN_ARTIFACT_VERSION}/jbpm-event-emitters-kafka-{MAVEN_ARTIFACT_VERSION}.jar` file into the `deployments/kie-server.war/WEB-INF/lib` subdirectory of the application server.
endif::PAM,DM[]
ifdef::JBPM,DROOLS,OP[]
... Retrieve the `org.jbpm.jbpm-event-emitters-kafka` JAR file version `{MAVEN_ARTIFACT_VERSION}` from the public Maven repository.
... Copy the file into the `deployments/kie-server.war/WEB-INF/lib` subdirectory of the application server.
endif::JBPM,DROOLS,OP[]
+
.. If you deployed the application using SpringBoot, add the following lines to the `<dependencies>` list in the `pom.xml` file of your service:
+
[source,xml]
----
<dependency>
<groupId>org.jbpm</groupId>
<artifactId>jbpm-event-emitters-kafka</artifactId>
<version>${version.org.kie}</version>
</dependency>
----
+
. Configure any of the following system properties for the {KIE_SERVER} as necessary:
* `org.kie.jbpm.event.emitters.kafka.boopstrap.servers`: The host and port of the Kafka broker. The default value is `localhost:9092`. You can use a comma-separated list of multiple host:port pairs.
* `org.kie.jbpm.event.emitters.kafka.date_format`: The timestamp format for the `time` field of the messages. The default value is `yyyy-MM-dd'T'HH:mm:ss.SSSZ` .
* `org.kie.jbpm.event.emitters.kafka.topic.processes`: The topic name for process event messages. The default value is `jbpm-processes-events`.
* `org.kie.jbpm.event.emitters.kafka.topic.cases`: The topic name for process event messages. The default value is `jbpm-cases-events`.
* `org.kie.jbpm.event.emitters.kafka.topic.tasks`: The topic name for process event messages. The default value is `jbpm-processes-tasks`.
* `org.kie.jbpm.event.emitters.kafka.client.id`: An identifier string to pass to the server when making requests. The server uses this string for logging.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[id='kieserver-kafka-proc_{context}']
= Configuring a {KIE_SERVER} to send and receive Kafka messages from the process

To run a process that sends or receives Kafka messages (except when using the custom task), you must use a {KIE_SERVER}. You must configure this {KIE_SERVER} to integrate with {KAFKA_PRODUCT}.

.Procedure

. To enable integration with {KAFKA_PRODUCT}, set the `org.kie.kafka.server.ext.disabled` system property of the {KIE_SERVER} to `false`.
. To configure the connection to the Kafka broker, set the `org.kie.server.jbpm-kafka.ext.bootstrap.servers` system property to the host and port of the broker. The default value is `localhost:9092`. You can use a comma-separated list of multiple host:port pairs.
. Optional: Set any of the following system properties to configure sending and receiving Kafka messages:
** `org.kie.server.jbpm-kafka.ext.client.id`: An identifier string to pass to the broker when making requests. {KAFKA_PRODUCT} uses this string for logging.
** `org.kie.server.jbpm-kafka.ext.topics.*`: Mapping of message or signal names to topic names. For example, if you want to send or receive a message in the `ExampleTopic` topic when `ExampleName` is the name of the message or signal, set the `org.kie.server.jbpm-kafka.ext.topics.ExampleName` system property to `ExampleTopic`. You can set any number of such system properties. If a message or signal name is not mapped using a system property, the {PROCESS_ENGINE} uses this name as the topic name.
. Optional: Set any of the following system properties to configure receiving Kafka messages:
** `org.kie.server.jbpm-kafka.ext.allow.auto.create.topics`: Allow automatic topic creation. Enabled by default.
** `org.kie.server.jbpm-kafka.ext.group.id`: A unique string that identifies the group to which this Kafka message consumer belongs. The default value is `jbpm-consumer`.
. Optional: Set any of the following system properties to configure sending Kafka messages:
** `org.kie.server.jbpm-kafka.ext.acks`: The number of acknowledgements that the Kafka leader must receive before marking the request as complete. The default value is `1`, which means the leader writes the record to its local log and then responds to the {PROCESS_ENGINE}, without waiting for full acknowledgement from all followers.
** `org.kie.server.jbpm-kafka.ext.max.block.ms`: The number of milliseconds for which the publish method blocks. After this time, the {PROCESS_ENGINE} can resume execution of the business process. The default value is `2000` (2 seconds).

Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[id='message-customtask-proc_{context}']
= Adding a custom task that sends Kafka messages

You can add a `KafkaPublishMessages` custom task to your process. This task sends Kafka messages. It does not use the {KIE_SERVER} Kafka capability, so you can use this task in processes that do not run on a {KIE_SERVER}. However, this task is more complicated to configure than other {KAFKA_PRODUCT} integration options.

.Procedure

. In the {CENTRAL} administrative settings menu, as the administrative user, select *Custom Tasks Administration*.
. Ensure that *KafkaPublishMessages* is set to *On*.
. In {CENTRAL}, select *Menu* -> *Design* -> *Projects* and then click the space name and the project name.
. Select the *Settings* -> *Custom Tasks* tab.
. In the *KafkaPublishMessages* line, click *Install*.
. Enter the following information:
** *Bootstrap Servers*: The host and port of the Kafka broker, for example, `localhost:9092`. You can use a comma-separated list of multiple host:port pairs.
** *Client ID*: An identifier string to pass to the broker when making requests. {KAFKA_PRODUCT} uses this string for logging.
** *Key Serializer class*: The class that provides the key serializer. Enter the standard serializer class name: `org.apache.kafka.common.serialization.StringSerializer`.
** *Value Serializer class*: The class that provides the value serializer. Enter the standard serializer class name: `org.apache.kafka.common.serialization.StringSerializer`.
. Click the *Assets* tab.
. Select the business process and open the business process designer.
. Add the `KafkaPublishMessages` custom task, available under *Custom Tasks* in the BPMN modeler palette.
. In the properties of the custom task, open the data assignments.
. Assign the *Key*, *Topic*, and *Value* inputs to define the message.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[id='message-receive-event-proc_{context}']
= Creating an event that receives Kafka messages

When designing your business process in {CENTRAL}, you can create an event that receives Kafka messages.

This event is triggered each time a message arrives in the configured topic. The message is expected to contain data that matches a predefined data object. The {PROCESS_ENGINE} parses the message and provides it as an output of the event.

.Procedure

. Open the project that contains your business process in {CENTRAL}.
. Create a data object defining the data that the message will contain. For instructions about creating data objects, see
ifdef::PAM,DM[]
{URL_DEVELOPING_PROCESS_SERVICES}#assembly-designing-business-processes[_{DESIGNING_BUSINESS_PROCESSES}_].
endif::PAM,DM[]
ifdef::JBPM,DROOLS,OP[]
xref:jBPMBPMN2[].
endif::JBPM,DROOLS,OP[]
. Select the business process and open the business process designer.
. Add a start event or an intermediate catch event of the type _message_ or _signal_.
. Open the properties of the event.
. In the *Message* or *Signal* field, select *New* and then enter the name of the message or signal. This name must be the same as the name of the topic from which the event is to receive Kafka messages, or else must be defined in an `org.kie.server.jbpm-kafka.ext.topics.*` system property of the {KIE_SERVER}.
. Add an output data item. Select the data object that you created as its type.
. Save the business process.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[id='message-send-event-proc_{context}']
= Creating an event that sends Kafka messages

When designing your business process in {CENTRAL}, you can create an event that sends Kafka messages.

The event can have a data object as an input data item. The {PROCESS_ENGINE} sends the content of a data object as a message in the configured topic.

.Procedure

. Open the project that contains your business process in {CENTRAL}.
. Create a data object defining the data that the message must contain. For instructions about creating data objects, see
ifdef::PAM,DM[]
{URL_DEVELOPING_PROCESS_SERVICES}#assembly-designing-business-processes[_{DESIGNING_BUSINESS_PROCESSES}_].
endif::PAM,DM[]
ifdef::JBPM,DROOLS,OP[]
xref:jBPMBPMN2[].
endif::JBPM,DROOLS,OP[]
. Select the business process and open the business process designer.
. Add an intermediate throw event or an end event of the type _message_ or _signal_.
. Open the properties of the event.
. In the *Message* or *Signal* field, select *New* and then enter the name of the message or signal. This name must be the same as the name of the topic to which the event is to send Kafka messages, or else must be defined in an `org.kie.server.jbpm-kafka.ext.topics.*` system property of the {KIE_SERVER}.
. Add an input data item. Select the data object that you created as its type.
. Save the business process.
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[[KafkaIntegration]]
= Integration with Apache Kafka

include::Kafka/integration-kafka-con.adoc[leveloffset=+1]
include::Kafka/message-receive-event-proc.adoc[leveloffset=+1]
include::Kafka/message-send-event-proc.adoc[leveloffset=+1]
include::Kafka/message-customtask-proc.adoc[leveloffset=+1]
include::Kafka/kieserver-kafka-proc.adoc[leveloffset=+1]
include::Kafka/kieserver-kafka-emit-proc.adoc[leveloffset=+1]
2 changes: 2 additions & 0 deletions doc-content/jbpm-docs/src/main/asciidoc/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ include::EclipseModeler-chapter.adoc[leveloffset=+1]
Integrating {PRODUCT} with other technologies, frameworks, etc.

include::Integration-chapter.adoc[leveloffset=+1]
include::KafkaIntegration-chapter.adoc[leveloffset=+1]


[discrete]
= Advanced Topics
Expand Down
3 changes: 3 additions & 0 deletions titles-enterprise/integrating/title.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ include::assemblies/assembly-springboot-business-apps.adoc[]
include::assemblies/assembly-integrating-fuse.adoc[]
include::assemblies/assembly-integrating-sso.adoc[]
include::assemblies/assembly-integrating-codeready-studio.adoc[]
ifdef::PAM[]
include::assemblies/assembly-integrating-amq-streams.adoc[]
endif::PAM[]

// Versioning info
include::_artifacts/versioning-information.adoc[]
Expand Down

0 comments on commit 14d0c6d

Please sign in to comment.