This blog shows how to use Apache to develop a customize authorization plugin for the kafka cluster.
Prerequisites
- Have a running OpenShift cluster
- Run a Kafka cluster with strimzi kafka operator
- Install kubectl, OpenShift CLI and curl on host
Expose the Kafka Cluster by KafkaBridge
To simplify the configuration setting for the kafka. I provision the kafka by strimzi-kafka-operator. In order to make Kafka expose interfaces externally like other services, I use KafkaBridge
to transform it into an HTTP service.
- Create the
KafkaBridge
# namespaceKAFKA_NAMESPACE=kafka# create kafka bridge instancecat <<EOF | oc apply -f -apiVersion: kafka.strimzi.io/v1beta2kind: KafkaBridgemetadata: name: strimzi-kafka-bridge namespace: ${KAFKA_NAMESPACE}spec: bootstrapServers: kafka-kafka-bootstrap.${KAFKA_NAMESPACE}.svc:9092 http: port: 8080 replicas: 1EOF
- Verification
KAFKA_NAMESPACE=kafka# forward 8080 by bridge podkubectl -n ${KAFKA_NAMESPACE} port-forward $(kubectl get pods -l strimzi.io/cluster=strimzi-kafka-bridge -n ${KAFKA_NAMESPACE} -o jsonpath="{.items[0].metadata.name}") 8080:8080# or forward 8080 by svckubectl -n ${KAFKA_NAMESPACE} port-forward svc/$(kubectl get svc -l strimzi.io/cluster=strimzi-kafka-bridge -n ${KAFKA_NAMESPACE} -o jsonpath="{.items[0].metadata.name}") 8080:8080# list topiccurl http://localhost:8080/topics# consume message with the consumerwhile true; do curl -X GET http://localhost:8080/consumers/strimzi-kafka-consumer-group/instances/strimzi-kafka-consumer/records \-H 'accept: application/vnd.kafka.json.v2+json'; sleep 1; done
Running on Openshift
- Install on ROSA
oc create sa -sa -n oc adm policy add-scc-to-user anyuid -z -sa -n helm install / \ --set gateway.type=NodePort \ --set etcd.podSecurityContext.enabled=false \ --set etcd.containerSecurityContext.enabled=false \ --set serviceAccount.name=-sa \ --namespace
- Configure the Kafka Route with Admin API
# forward 9180 port to local hostkubectl -n port-forward $(kubectl get pods -l app.kubernetes.io/name= -n -o jsonpath="{.items[0].metadata.name}") 9180:9180# the bridge service name can be accessed by# kubectl get svc -l strimzi.io/cluster=strimzi-kafka-bridge -n $KAFKA_NAMESPACE -o jsonpath="{.items[0].metadata.name}"curl "http://127.0.0.1:9180//admin/routes/1" \-H "X-API-KEY: edd1c9f034335f136f87ad84b625c8f1" -X PUT -d '{ "methods": ["GET", "POST", "DELETE", "PUT"], "host": "example.com", "uri": "/*", "plugins": { "ext-plugin-post-resp": { "conf": [ {"name":"my-response-rewrite", "value":"{\"tag\":\"\"}"} ] } }, "upstream": { "type": "roundrobin", "nodes": { "strimzi-kafka-bridge-bridge-service.kafka.svc:8080": 1 } }}'
- Request the Kafka Service with Client API
# forward the http api of to local hostkubectl -n port-forward $(kubectl get pods -l app.kubernetes.io/name= -n -o jsonpath="{.items[0].metadata.name}") 9080:9080# list topiccurl --verbose --header "Host: example.com" http://localhost:9080/topics# send message to the topiccurl --header "Host: example.com" --location 'http://localhost:9080/topics/event' -H 'Content-Type: application/vnd.kafka.json.v2+json' --data \'{ "records":[ { "key":"event5", "value": "hello5" }, { "key":"event6", "value": "world6" } ]}'# create a kafka consumer in a new consumer groupcurl --header "Host: example.com" -X POST http://localhost:9080/consumers/strimzi-kafka-consumer-group \ -H 'content-type: application/vnd.kafka.v2+json' \ -d '{ "name": "strimzi-kafka-consumer", "auto.offset.reset": "earliest", "format": "json", "enable.auto.commit": true, "fetch.min.bytes": 512, "consumer.request.timeout.ms": 30000 }'# subscribe to the topiccurl --header "Host: example.com" -X POST http://localhost:9080/consumers/strimzi-kafka-consumer-group/instances/strimzi-kafka-consumer/subscription \ -H 'content-type: application/vnd.kafka.v2+json' \ -d '{ "topics": [ "event" ]}'# consume message with the consumerwhile true; do curl --header "Host: example.com" -X GET http://localhost:9080/consumers/strimzi-kafka-consumer-group/instances/strimzi-kafka-consumer/records \-H 'accept: application/vnd.kafka.json.v2+json'; sleep 1; done
Develop an Authentication Plugin with Golang
Develop a validation plugin for the certificates
I develop the plugin leverage the Go plugin runner. The plugin is just read the certific