By: Julio Perera on August 21st, 2024
MAS deployment series: Configuring Kafka Queues in MAS Manage
This is a continuation of our series of blogs about deployment of MAS. In this case, we are going to describe how to configure Kafka Topics as Queues in MAS Manage. In general, the configurations are described in the following URL: https://www.ibm.com/docs/en/mas-cd/maximo-manage/continuous-delivery?topic=applications-integration-apache-kafka. There is an assumption that the Kafka Server has been already installed and it is working in OpenShift. Currently the installation supports both AMQ Streams or Strimzi operators, although we prefer Strimzi as it doesn’t require a license.
It is also worthwhile to note that there are other alternatives to Kafka Queues for integration messages in Manage, namely to use JMS via deployment of the related Server Bundle and post-deployment configuration as described in https://www.ibm.com/docs/en/mas-cd/maximo-manage/continuous-delivery?topic=servers-configuring-jms-maximo-managemaximo-health. In general, JMS will perform better, but taking into consideration the amount of resources it consumes as well as the added complexity around deploying, configuring and maintaining the JMS solution then we wanted to provide a Kafka approach as well. Kafka can be scaled using multiple Partitions per Topic and as many Kafka replicas as needed to get to the desired message throughput. Also, Kafka is required for Monitor (and IoT) which creates and uses several Kafka Topics and therefore, it is a little more future proof than JMS in that regard.
Also, the documentation presented here is updated at the time of its released where the latest MAS Manage version is 9.0.43. Considering in the past the Manage side of the Kafka consumer implementation has changed, there is no guarantee that it won’t change again in the future.
The first step before start creating and configuring the Kafka Topics, is to size them based on the expected volume of messages that are going to be processed. In that regard, consuming simultaneously from the same Kafka Topic in multiple threads is accomplished by configuring more than one Partition in the Topic. Therefore, for Continuous Queues such as CQIN backed by Kafka Topics we recommend a minimum of 3 partitions, but more may need to be configured if messaging volume / throughput is high enough to warrant it. Also notice that the number of partitions will also impact the number of Replicas for the Kafka Cluster pods as more replicas may be required to host more partitions in case the existing replicas get overwhelmed. To make the number of replicas higher, we could go into the Kafka CRD instance for our Kafka Cluster and modify the number of Brokers to more than 3 (which is the default for non SNO deployments) as needed. See the below screenshots for details:
The number of Kafka Brokers as per above will also allow to specify a higher Replication Factor in each queue, see below for details.
To create the actual Topics that are going to host the integration messages, we need to first consider how many Manage Queues we need (as and above, how many Partitions per each).
Below are the standard Queues for a typical Manage installation:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: p01-manage-cqin
namespace: common-kafka
labels:
strimzi.io/cluster: kafka-cluster
spec:
config:
retention.ms: 604800000
segment.bytes: 10485760
partitions: 3
replicas: 3
The above is a sample Kafka Topic definition, where the following can (and should) be adjusted as needed:
- name: Set to the name of the Topic as per the above naming convention. There can be only one Topic by that name in the Kafka Cluster. This name is what we use in Maximo Manage to refer to the Topic.
- namespace: The Namespace/Project where the Kafka server is installed.
- strimzi.io/cluster: The Kafka Cluster Name that was installed as a pre-requisite.
- retention.ms: The number of milliseconds each Message/Post will last in the server. This is the only parameter that affects how much messages last and are stored in the Topic. There is no API or any other ways of removing individual messages from a Topic and they are removed when they expire. In our case, we are using “604800000” as an example, which means “7 days” or “a week”. Higher message retention means messages will be kept for longer times on the underlying storage but it also risks deletion of unconsumed messages if there is a problem with the queue and messages are not being consumed and the retention period passes. Which is especially important for Sequential Queues where the Queue stops on error. Therefore, whatever the value there is a tradeoff of space used vs. longevity and periodic monitoring is required.
- segment.bytes: The number of bytes that each message can consume (at most). In our case, we are using “10485760” as the default, equivalent to 10 MB. This is a maximum number and should be more than or equal the mxe.kafka.messagesize system property for the maximum message size that can be processed. Configure the property to match the message size, in bytes, that you set on the Kafka server. The default setting is 10 MB when the System Property is left un-defined. When you configure this limit, consider the size of your transactions, including the size of integrated attachments. Large message size can have a negative performance impact on the reading and writing of messages in the queue.
- partitions: The number of Partitions to configure for the Topic. This will inform how many simultaneous consumers we can create in Manage (basically each will be a separate crontask instance) and therefore will have a direct impact in throughput.
- replicas: The number of replicas that we are going to keep for the Topic. This is related but not the same as the number of Partitions (the number of Partitions can be higher than the number of Replicas). Each Partition will have the number of Replicas specified, running simultaneously. And all replicas will store the same data for the Partition and they can be used interchangeably for redundancy and load balancing purposes. However, the number of replicas for a Topic cannot be higher than the Replication Factor (or number of Kafka Brokers) as per above. Every post in a Topic is replicated to through all the replicas so write performance may be a consideration to keep the number of replicas to a minimum.
Once we have created all the Topics by submitting the appropriate YAML to the OpenShift Cluster as per above, we should get all the topics showing “Ready” as per the below screenshot:
Once the topics have been created, we should grab the correct Kafka Server configuration, basically we need to obtain:
A) Kafka Server URL also named “Bootstrap Server URL” (BOOTSTRAPSERVERS).
B) Security Protocol (SECURITY_PROTOCOL): We always use the value: “SASL_PLAINTEXT”
C) The SASL Mechanism (SASL_MECHANISM): We always use: “SCRAM-SHA-512”
D) The username (USERNAME): We always use “masuser”, but it may be different according to the Kafka server install.
E) The password for the above user (PASSWORD). Which should have been defined at install time.
We are going to explain how to extract the above information from the running OpenShift Cluster.
A) To obtain the “BOOTSTRAPSERVERS” we can use the following oc command:oc get -n <kafka-namespace> kafka <kafka-cluster-name> -ojsonpath='{.status.listeners[?(@.type=="plain")].bootstrapServers}'
and in our case, we got the following response (note the URL is internal and cannot be used to connect from outside of the OpenShift Cluster):
kafka-cluster-kafka-bootstrap.common-kafka.svc:9092
In case of not getting a value due to the PLAIN Listener not enabled/installed we could either use the SSL Listener (which requires submitting the CA certificate to Manage, which we are trying to avoid) or define the PLAIN Listener in the Kafka Server by adding the following fragment to it:
spec:
kafka:
listeners:
- authentication:
type: scram-sha-512
name: plain
networkPolicyPeers:
- namespaceSelector: {}
port: 9092
tls: false
type: internal
C) The obtain the SASL_MECHANISM we can use the following oc command:
oc get -n <kafka-namespace> kafka <kafka-cluster-name>-
ojsonpath='{.spec.kafka.listeners[?
(@.type=="internal")].authentication.type}'
We got the following response:
scram-sha-512
D) To obtain the USERNAME we can run the following oc command:oc get kafkauser -n <kafka-namespace>
and in our case, we got the following response:
NAME CLUSTER AUTHENTICATION AUTHORIZATION READY
kafka-user kafka-cluster scram-sha-512 simple True
oc extract secret/<kafka-user> -n <kafka-namespace> --keys=password --to=-
Once having all the above information, we should be ready to configure Manage to connect to the Kafka Server. To do so, login to Manage as a user with enough privileges to configure External System information and go to the External Systems application then from Select Action choose Add/Modify Message Providers then create a new record with the following information:
Name: KAFKA
Provider Type: KAFKA
Provider Type Description: Kafka Message Provider
Active: <checked>
And then under Properties for the provider Kafka, fill in the values that were retrieved in the previous step. Then press <OK> to accept. See below for a screenshot on how it looked like (there are extra properties in page 2 which are not being shown):
Next, still on the External Systems application from Select Action, choose Add/Modify Queues and then add a new Queue Definition by using the (+) sign and just choose “KAFKA” as Message Provider. Then just click on the <Loupe> icon next to the Queue JNDI Name field and a list of Kafka Topics should be presented. If the topics are listed, then connection to the Kafka Server using the previous configuration was successful. See below for a screenshot:
Next, continue filling in the rest of the information for the Continuous Outbound Queue (legacy CQOUT), see below for a sample:
Message Provider: KAFKA
Queue JNDI Name: <topic-name-for-cqout>
Sequential: <un-checked>
Inbound: <un-checked>
Maximum Try Count: 2
Multi Part?: <un-checked>
In our case, we used the following values as per the below screenshot:
Continue to fill and add for the other Queues as shown below:
For the 'Inbound Continuous Queue':
Message Provider: KAFKA
Queue JNDI Name: <topic-name-for-cqin>
Sequential: <un-checked>
Inbound: <checked>
Maximum Try Count: 2
Multi Part?: <un-checked>
For the 'Inbound Sequential Queue':
Message Provider: KAFKA
Queue JNDI Name: <topic-name-for-sqin>
Sequential: <checked>
Inbound: <checked>
Maximum Try Count: 2
Multi Part?: <un-checked>
For the 'Outbound Sequential Queue':
Message Provider: KAFKA
Queue JNDI Name: <topic-name-for-sqout>
Sequential: <un-checked>
Inbound: <checked>
Maximum Try Count: 2
Multi Part?: <un-checked>
Once defined the actual Queues, press <OK> to abandon the dialog.
The next step is to define Cron Task instances to process messages for each of these Queues (and for each Partition of the Continuous Queues as well). To do so, go to the Cron Task Setup application and from the list look for the Kafka Cron Task then there should be four instances already there.
Let’s just rework the instances considering the information below:
Cron Task Instance Name: kafkacqin-0
Schedule: 1m,*,*,*,*,*,*,*,*,*
Run as User: MAXADMIN
Active: <checked>
Keep History: <checked>
Max Number of History Records: 20
Feel free to define Schedule, Run as User, Keep History and Max Number of History Records as appropriate for your system.
And then under Cron Task Properties, the following:
MESSAGEHUB: KAFKA
MESSAGEPROCESSOR: psdi.iface.jms.QueueToMaximoProcessor
PARTITION: 0
TOPIC: <topic-name-for-cqin>
This will create an instance to process record for Partition 0 of the Continuous Inbound Queue (legacy CQIN).
Create additional instances for the other Partitions, such as “kafkacqin-1” and “kafkacqin-2”. Use the proper value for the PARTITION parameter and notice they start at zero.
Repeat for the other instances (legacy SQIN, SQOUT and CQOUT), remembering setting up one instance per Partition of each Queue. The SQINERR instance can be left inactive as it is deprecated.
Also, for Inbound Queues. The proper value for the MESSAGEPROCESSOR is “psdi.iface.jms.QueueToMaximoProcessor” whereas for Outbound Queues is “psdi.iface.jms.QueueToDestManagerProcessor”.
Finally, we can define the instances of the KAFKAERROR Cron Task to process errors on both Continuous Queues (legacy CQIN and CQOUT). To do so, load the KAFKAERROR Cron Task and configure the instances as shown below:
Cron Task Instance Name: kafkaerrorcqin
Schedule: 1m,*,*,*,*,*,*,*,*,*
Run as User: MAXADMIN
Active: <checked>
Keep History: <checked>
Max Number of History Records: 20
Feel free to define Schedule, Run as User, Keep History and Max Number of History Records as appropriate for your system.
And then under Cron Task Properties, the following:
MESSAGEPROCESSOR: psdi.iface.jms.QueueToMaximoProcessor
TOPIC: <topic-name-for-cqin>
This will create an instance to process errored records for the whole Continuous Inbound Queue (legacy CQIN).
Then repeat for the Continuous Outbound Queue, we add another instance with the following information:
Cron Task Instance Name: kafkaerrorcqout
Schedule: 1m,*,*,*,*,*,*,*,*,*
Run as User: MAXADMIN
Active: <checked>
Keep History: <checked>
Max Number of History Records: 20
Feel free to define Schedule, Run as User, Keep History and Max Number of History Records as appropriate for your system.
And then under Cron Task Properties, the following:
MESSAGEPROCESSOR: psdi.iface.jms.QueueToDestManagerProcessor
TOPIC: <topic-name-for-cqout>
This will create an instance to process errored records for the whole Continuous Outbound Queue (legacy CQOUT).
The above should complete the configuration to have Kafka as the Message Processor for Maximo integrations. Check the documentation links for additional details (including related System Properties).
References:
https://www.ibm.com/support/pages/how-configure-kafka-maximo-manage-ibm-cloud