Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BXMSDOC-7030] AMQ Streams integration doc - 7.48.x #3189

Merged
merged 2 commits into from
Jan 25, 2021

Conversation

mramendi
Copy link
Contributor

  • AMQ Streams intergration doc - initial

  • creating new message/signal

  • Configuring the KIE server

  • Emitter and some work on custom task

  • Community build

  • Custom Task review, started Peer review

  • Peer review and final clarifications for Kafka integration

* AMQ Streams intergration doc - initial

* creating new message/signal

* Configuring the KIE server

* Emitter and some work on custom task

* Community build

* Custom Task review, started Peer review

* Peer review and final clarifications for Kafka integration
@sterobin
Copy link
Contributor

Jenkins retest this please

@sterobin sterobin merged commit 14d0c6d into apache:7.48.x Jan 25, 2021
Copy link
Contributor

@gmunozfe gmunozfe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments about missing configuration and boundary event node as consumer.

ifdef::PAM,DM[]
{KAFKA_PRODUCT}, based on Apache Kafka,
endif::PAM,DM[]
is a streaming platform. It passes messages, sorted into topics, between applications in a software environment.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"It acts as a message broker" sounds perhaps more accurate, but up to you


You can create business processes using {PRODUCT} that send and receive Kafka messages in the following ways:

* Create a start event or intermediate catch event of the type _message_ or _signal_. The {KIE_SERVER} automatically subsribes to the Kafka topic that is defined in the message or signal. A message triggers the event. The event can pass the content of the message to the subsequent node in the process.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing boundary event attached to human task.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo "subsribes" -> subscribes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd mention that for these ones, these nodes act as a "Consumer", as it's the usual way to refer to these components "consuming messages" in Kafka world.


* Create a start event or intermediate catch event of the type _message_ or _signal_. The {KIE_SERVER} automatically subsribes to the Kafka topic that is defined in the message or signal. A message triggers the event. The event can pass the content of the message to the subsequent node in the process.

* Create an end event or intermediate throw event of the type _message_ or _signal_. When the process triggers the event, the {KIE_SERVER} sends a Kafka message in the topic that is defined in the message or signal. The message contains the data that is configured in the event.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This group of nodes are the "Producers"


* Create an end event or intermediate throw event of the type _message_ or _signal_. When the process triggers the event, the {KIE_SERVER} sends a Kafka message in the topic that is defined in the message or signal. The message contains the data that is configured in the event.

* Add the `KafkaPublishMessages` custom task to the process. This task does not require the {KIE_SERVER} Kafka capability but is significantly more complicated to configure than signal or message events.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that "significantly more complicated" is a subjective sentence, not sure if useful at this point. This is related to WorkItemHandler, that it's different from the Kafka-extension capability (2 previous ones).

xref:jBPMBPMN2[].
endif::JBPM,DROOLS,OP[]
. Select the business process and open the business process designer.
. Add a start event or an intermediate catch event of the type _message_ or _signal_.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing boundary event

. Select the business process and open the business process designer.
. Add a start event or an intermediate catch event of the type _message_ or _signal_.
. Open the properties of the event.
. In the *Message* or *Signal* field, select *New* and then enter the name of the message or signal. This name must be the same as the name of the topic from which the event is to receive Kafka messages, or else must be defined in an `org.kie.server.jbpm-kafka.ext.topics.*` system property of the {KIE_SERVER}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as a reader, it's not clear to me how it's the mapping in the system property. Asterisk should be replaced to [BROKER_TOPIC], and add an example if possible

. Select the business process and open the business process designer.
. Add an intermediate throw event or an end event of the type _message_ or _signal_.
. Open the properties of the event.
. In the *Message* or *Signal* field, select *New* and then enter the name of the message or signal. This name must be the same as the name of the topic to which the event is to send Kafka messages, or else must be defined in an `org.kie.server.jbpm-kafka.ext.topics.*` system property of the {KIE_SERVER}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above

[id='message-customtask-proc_{context}']
= Adding a custom task that sends Kafka messages

You can add a `KafkaPublishMessages` custom task to your process. This task sends Kafka messages. It does not use the {KIE_SERVER} Kafka capability, so you can use this task in processes that do not run on a {KIE_SERVER}. However, this task is more complicated to configure than other {KAFKA_PRODUCT} integration options.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"more complicated to configure than other" is a subjective sentence

** *Bootstrap Servers*: The host and port of the Kafka broker, for example, `localhost:9092`. You can use a comma-separated list of multiple host:port pairs.
** *Client ID*: An identifier string to pass to the broker when making requests. {KAFKA_PRODUCT} uses this string for logging.
** *Key Serializer class*: The class that provides the key serializer. Enter the standard serializer class name: `org.apache.kafka.common.serialization.StringSerializer`.
** *Value Serializer class*: The class that provides the value serializer. Enter the standard serializer class name: `org.apache.kafka.common.serialization.StringSerializer`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not mandatory, value can be any serializer class, even from the business application (kjar).
See this test:
https://github.com/kiegroup/jbpm-work-items/blob/master/jbpm-workitem-itests/src/test/java/org/jbpm/workitem/springboot/samples/KafkaSerializationTest.java

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option is to read the values from system properties, with the pattern env['property'].
See this configuration:
new org.jbpm.process.workitem.kafka.KafkaWorkItemHandler(env['bootstrap.servers'], env['client.id'], env['key.serializer'], env['value.serializer'], classLoader)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing the classLoader information (I think it's also present at BC)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not able to work out full names of system properties from this configuration. Also, the work item can be used without the KIE Server, in an app that embeds the engine? Please clarify exactly which system properties one can set, and in which cases (KIE Server or Embedded) they will work. 

Also I do not have any classLoader information, it was not in the BAPL/RHPAM/JBPM Jiras and Enrique did not mention it. If you want me to include this information, please provide it.


.Procedure

. To enable integration with {KAFKA_PRODUCT}, set the `org.kie.kafka.server.ext.disabled` system property of the {KIE_SERVER} to `false`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is true for kie-server deployed with wildfly/EAP. For Springboot engine, it's kieserver.kafka.enabled=true

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants