Access the Kafka Cluster by Gateway

admin 2 2025-01-12 编辑

Access the Kafka Cluster by  Gateway

This blog shows how to use Apache to develop a customize authorization plugin for the kafka cluster.

Prerequisites​

  • Have a running OpenShift cluster
  • Run a Kafka cluster with strimzi kafka operator
  • Install kubectl, OpenShift CLI and curl on host

Expose the Kafka Cluster by KafkaBridge​

To simplify the configuration setting for the kafka. I provision the kafka by strimzi-kafka-operator. In order to make Kafka expose interfaces externally like other services, I use KafkaBridge to transform it into an HTTP service.

  • Create the KafkaBridge
# namespaceKAFKA_NAMESPACE=kafka# create kafka bridge instancecat <<EOF | oc apply -f -apiVersion: kafka.strimzi.io/v1beta2kind: KafkaBridgemetadata:  name: strimzi-kafka-bridge  namespace: ${KAFKA_NAMESPACE}spec:  bootstrapServers: kafka-kafka-bootstrap.${KAFKA_NAMESPACE}.svc:9092  http:    port: 8080  replicas: 1EOF
  • Verification
KAFKA_NAMESPACE=kafka# forward 8080 by bridge podkubectl -n ${KAFKA_NAMESPACE} port-forward $(kubectl get pods -l strimzi.io/cluster=strimzi-kafka-bridge -n ${KAFKA_NAMESPACE} -o jsonpath="{.items[0].metadata.name}") 8080:8080# or forward 8080 by svckubectl -n ${KAFKA_NAMESPACE} port-forward svc/$(kubectl get svc -l strimzi.io/cluster=strimzi-kafka-bridge -n ${KAFKA_NAMESPACE} -o jsonpath="{.items[0].metadata.name}") 8080:8080# list topiccurl http://localhost:8080/topics# consume message with the consumerwhile true; do curl -X GET http://localhost:8080/consumers/strimzi-kafka-consumer-group/instances/strimzi-kafka-consumer/records \-H 'accept: application/vnd.kafka.json.v2+json'; sleep 1; done

Running on Openshift​

  • Install on ROSA
oc create sa -sa -n oc adm policy add-scc-to-user anyuid -z -sa -n helm install  / \  --set gateway.type=NodePort \  --set etcd.podSecurityContext.enabled=false \  --set etcd.containerSecurityContext.enabled=false \  --set serviceAccount.name=-sa \  --namespace 
  • Configure the Kafka Route with Admin API
# forward 9180 port to local hostkubectl -n  port-forward $(kubectl get pods -l app.kubernetes.io/name= -n  -o jsonpath="{.items[0].metadata.name}") 9180:9180# the bridge service name can be accessed by# kubectl get svc -l strimzi.io/cluster=strimzi-kafka-bridge -n $KAFKA_NAMESPACE -o jsonpath="{.items[0].metadata.name}"curl "http://127.0.0.1:9180//admin/routes/1" \-H "X-API-KEY: edd1c9f034335f136f87ad84b625c8f1" -X PUT -d '{  "methods": ["GET", "POST", "DELETE", "PUT"],  "host": "example.com",  "uri": "/*",  "plugins": {    "ext-plugin-post-resp": {      "conf": [        {"name":"my-response-rewrite", "value":"{\"tag\":\"\"}"}      ]    }  },  "upstream": {    "type": "roundrobin",    "nodes": {      "strimzi-kafka-bridge-bridge-service.kafka.svc:8080": 1    }  }}'
  • Request the Kafka Service with Client API
# forward the http api of  to local hostkubectl -n  port-forward $(kubectl get pods -l app.kubernetes.io/name= -n  -o jsonpath="{.items[0].metadata.name}") 9080:9080# list topiccurl --verbose --header "Host: example.com" http://localhost:9080/topics# send message to the topiccurl --header "Host: example.com" --location 'http://localhost:9080/topics/event' -H 'Content-Type: application/vnd.kafka.json.v2+json' --data \'{   "records":[      {         "key":"event5",         "value": "hello5"      },      {         "key":"event6",         "value": "world6"      }   ]}'# create a kafka consumer in a new consumer groupcurl --header "Host: example.com" -X POST http://localhost:9080/consumers/strimzi-kafka-consumer-group \  -H 'content-type: application/vnd.kafka.v2+json' \  -d '{    "name": "strimzi-kafka-consumer",    "auto.offset.reset": "earliest",    "format": "json",    "enable.auto.commit": true,    "fetch.min.bytes": 512,    "consumer.request.timeout.ms": 30000  }'# subscribe to the topiccurl --header "Host: example.com" -X POST http://localhost:9080/consumers/strimzi-kafka-consumer-group/instances/strimzi-kafka-consumer/subscription \  -H 'content-type: application/vnd.kafka.v2+json' \  -d '{    "topics": [        "event"    ]}'# consume message with the consumerwhile true; do curl --header "Host: example.com" -X GET http://localhost:9080/consumers/strimzi-kafka-consumer-group/instances/strimzi-kafka-consumer/records \-H 'accept: application/vnd.kafka.json.v2+json'; sleep 1; done

Develop an Authentication Plugin with Golang​

  • Develop a validation plugin for the certificates

    I develop the plugin leverage the Go plugin runner. The plugin is just read the certific

Access the Kafka Cluster by Gateway

上一篇: Understanding the Significance of 3.4 as a Root in Mathematics
下一篇: A poor man's API
相关文章