This is a continuation of our series of blogs about deployment of MAS. In this case, we are going to describe how to configure Kafka Topics as Queues in MAS Manage. In general, the configurations are described in the following URL: https://www.ibm.com/docs/en/mas-cd/maximo-manage/continuous-delivery?topic=applications-integration-apache-kafka. There is an assumption that the Kafka Server has been already installed and it is working in OpenShift. Currently the installation supports both AMQ Streams or Strimzi operators, although we prefer Strimzi as it doesn’t require a license.
It is also worthwhile to note that there are other alternatives to Kafka Queues for integration messages in Manage, namely to use JMS via deployment of the related Server Bundle and post-deployment configuration as described in https://www.ibm.com/docs/en/mas-cd/maximo-manage/continuous-delivery?topic=servers-configuring-jms-maximo-managemaximo-health. In general, JMS will perform better, but taking into consideration the amount of resources it consumes as well as the added complexity around deploying, configuring and maintaining the JMS solution then we wanted to provide a Kafka approach as well. Kafka can be scaled using multiple Partitions per Topic and as many Kafka replicas as needed to get to the desired message throughput. Also, Kafka is required for Monitor (and IoT) which creates and uses several Kafka Topics and therefore, it is a little more future proof than JMS in that regard.
Also, the documentation presented here is updated at the time of its released where the latest MAS Manage version is 9.0.43. Considering in the past the Manage side of the Kafka consumer implementation has changed, there is no guarantee that it won’t change again in the future.
The first step before start creating and configuring the Kafka Topics, is to size them based on the expected volume of messages that are going to be processed. In that regard, consuming simultaneously from the same Kafka Topic in multiple threads is accomplished by configuring more than one Partition in the Topic. Therefore, for Continuous Queues such as CQIN backed by Kafka Topics we recommend a minimum of 3 partitions, but more may need to be configured if messaging volume / throughput is high enough to warrant it. Also notice that the number of partitions will also impact the number of Replicas for the Kafka Cluster pods as more replicas may be required to host more partitions in case the existing replicas get overwhelmed. To make the number of replicas higher, we could go into the Kafka CRD instance for our Kafka Cluster and modify the number of Brokers to more than 3 (which is the default for non SNO deployments) as needed. See the below screenshots for details:
The number of Kafka Brokers as per above will also allow to specify a higher Replication Factor in each queue, see below for details.
To create the actual Topics that are going to host the integration messages, we need to first consider how many Manage Queues we need (as and above, how many Partitions per each).
Below are the standard Queues for a typical Manage installation:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: p01-manage-cqin
namespace: common-kafka
labels:
strimzi.io/cluster: kafka-cluster
spec:
config:
retention.ms: 604800000
segment.bytes: 10485760
partitions: 3
replicas: 3
The above is a sample Kafka Topic definition, where the following can (and should) be adjusted as needed:
Once we have created all the Topics by submitting the appropriate YAML to the OpenShift Cluster as per above, we should get all the topics showing “Ready” as per the below screenshot:
Once the topics have been created, we should grab the correct Kafka Server configuration, basically we need to obtain:
A) Kafka Server URL also named “Bootstrap Server URL” (BOOTSTRAPSERVERS).
B) Security Protocol (SECURITY_PROTOCOL): We always use the value: “SASL_PLAINTEXT”
C) The SASL Mechanism (SASL_MECHANISM): We always use: “SCRAM-SHA-512”
D) The username (USERNAME): We always use “masuser”, but it may be different according to the Kafka server install.
E) The password for the above user (PASSWORD). Which should have been defined at install time.
We are going to explain how to extract the above information from the running OpenShift Cluster.
A) To obtain the “BOOTSTRAPSERVERS” we can use the following oc command:oc get -n <kafka-namespace> kafka <kafka-cluster-name> -ojsonpath='{.status.listeners[?(@.type=="plain")].bootstrapServers}'
and in our case, we got the following response (note the URL is internal and cannot be used to connect from outside of the OpenShift Cluster):
kafka-cluster-kafka-bootstrap.common-kafka.svc:9092
In case of not getting a value due to the PLAIN Listener not enabled/installed we could either use the SSL Listener (which requires submitting the CA certificate to Manage, which we are trying to avoid) or define the PLAIN Listener in the Kafka Server by adding the following fragment to it:
spec:
kafka:
listeners:
- authentication:
type: scram-sha-512
name: plain
networkPolicyPeers:
- namespaceSelector: {}
port: 9092
tls: false
type: internal
oc get -n <kafka-namespace> kafka <kafka-cluster-name>-
ojsonpath='{.spec.kafka.listeners[?
(@.type=="internal")].authentication.type}'
We got the following response:
scram-sha-512
D) To obtain the USERNAME we can run the following oc command:oc get kafkauser -n <kafka-namespace>
and in our case, we got the following response:
NAME CLUSTER AUTHENTICATION AUTHORIZATION READY
kafka-user kafka-cluster scram-sha-512 simple True
oc extract secret/<kafka-user> -n <kafka-namespace> --keys=password --to=-
Once having all the above information, we should be ready to configure Manage to connect to the Kafka Server. To do so, login to Manage as a user with enough privileges to configure External System information and go to the External Systems application then from Select Action choose Add/Modify Message Providers then create a new record with the following information:
Name: KAFKA
Provider Type: KAFKA
Provider Type Description: Kafka Message Provider
Active: <checked>
And then under Properties for the provider Kafka, fill in the values that were retrieved in the previous step. Then press <OK> to accept. See below for a screenshot on how it looked like (there are extra properties in page 2 which are not being shown):
Next, still on the External Systems application from Select Action, choose Add/Modify Queues and then add a new Queue Definition by using the (+) sign and just choose “KAFKA” as Message Provider. Then just click on the <Loupe> icon next to the Queue JNDI Name field and a list of Kafka Topics should be presented. If the topics are listed, then connection to the Kafka Server using the previous configuration was successful. See below for a screenshot:
Next, continue filling in the rest of the information for the Continuous Outbound Queue (legacy CQOUT), see below for a sample:
Message Provider: KAFKA
Queue JNDI Name: <topic-name-for-cqout>
Sequential: <un-checked>
Inbound: <un-checked>
Maximum Try Count: 2
Multi Part?: <un-checked>
In our case, we used the following values as per the below screenshot:
Continue to fill and add for the other Queues as shown below:
For the 'Inbound Continuous Queue':
Message Provider: KAFKA
Queue JNDI Name: <topic-name-for-cqin>
Sequential: <un-checked>
Inbound: <checked>
Maximum Try Count: 2
Multi Part?: <un-checked>
For the 'Inbound Sequential Queue':
Message Provider: KAFKA
Queue JNDI Name: <topic-name-for-sqin>
Sequential: <checked>
Inbound: <checked>
Maximum Try Count: 2
Multi Part?: <un-checked>
For the 'Outbound Sequential Queue':
Message Provider: KAFKA
Queue JNDI Name: <topic-name-for-sqout>
Sequential: <un-checked>
Inbound: <checked>
Maximum Try Count: 2
Multi Part?: <un-checked>
Once defined the actual Queues, press <OK> to abandon the dialog.
The next step is to define Cron Task instances to process messages for each of these Queues (and for each Partition of the Continuous Queues as well). To do so, go to the Cron Task Setup application and from the list look for the Kafka Cron Task then there should be four instances already there.
Let’s just rework the instances considering the information below:
Cron Task Instance Name: kafkacqin-0
Schedule: 1m,*,*,*,*,*,*,*,*,*
Run as User: MAXADMIN
Active: <checked>
Keep History: <checked>
Max Number of History Records: 20
Feel free to define Schedule, Run as User, Keep History and Max Number of History Records as appropriate for your system.
And then under Cron Task Properties, the following:
MESSAGEHUB: KAFKA
MESSAGEPROCESSOR: psdi.iface.jms.QueueToMaximoProcessor
PARTITION: 0
TOPIC: <topic-name-for-cqin>
This will create an instance to process record for Partition 0 of the Continuous Inbound Queue (legacy CQIN).
Create additional instances for the other Partitions, such as “kafkacqin-1” and “kafkacqin-2”. Use the proper value for the PARTITION parameter and notice they start at zero.
Repeat for the other instances (legacy SQIN, SQOUT and CQOUT), remembering setting up one instance per Partition of each Queue. The SQINERR instance can be left inactive as it is deprecated.
Also, for Inbound Queues. The proper value for the MESSAGEPROCESSOR is “psdi.iface.jms.QueueToMaximoProcessor” whereas for Outbound Queues is “psdi.iface.jms.QueueToDestManagerProcessor”.
Finally, we can define the instances of the KAFKAERROR Cron Task to process errors on both Continuous Queues (legacy CQIN and CQOUT). To do so, load the KAFKAERROR Cron Task and configure the instances as shown below:
Cron Task Instance Name: kafkaerrorcqin
Schedule: 1m,*,*,*,*,*,*,*,*,*
Run as User: MAXADMIN
Active: <checked>
Keep History: <checked>
Max Number of History Records: 20
Feel free to define Schedule, Run as User, Keep History and Max Number of History Records as appropriate for your system.
And then under Cron Task Properties, the following:
MESSAGEPROCESSOR: psdi.iface.jms.QueueToMaximoProcessor
TOPIC: <topic-name-for-cqin>
This will create an instance to process errored records for the whole Continuous Inbound Queue (legacy CQIN).
Then repeat for the Continuous Outbound Queue, we add another instance with the following information:
Cron Task Instance Name: kafkaerrorcqout
Schedule: 1m,*,*,*,*,*,*,*,*,*
Run as User: MAXADMIN
Active: <checked>
Keep History: <checked>
Max Number of History Records: 20
Feel free to define Schedule, Run as User, Keep History and Max Number of History Records as appropriate for your system.
And then under Cron Task Properties, the following:
MESSAGEPROCESSOR: psdi.iface.jms.QueueToDestManagerProcessor
TOPIC: <topic-name-for-cqout>
This will create an instance to process errored records for the whole Continuous Outbound Queue (legacy CQOUT).
The above should complete the configuration to have Kafka as the Message Processor for Maximo integrations. Check the documentation links for additional details (including related System Properties).
References:
https://www.ibm.com/support/pages/how-configure-kafka-maximo-manage-ibm-cloud