This article describes how to use the kafka-logger plugin with . Wiht enhancements, the plugin now has mature and complete functions.
Apache Kafka is an open source stream processing platform managed by Apache, written in Scala and Java. Apache Kafka provides uniformed, high-throughput, low-latency functionality for real-time data processing.
Apache Kafka's persistence layer is essentially a "massive publish/subscribe message queue following a distributed transaction logging architecture," making it valuable as an enterprise-class infrastructure for processing streaming data. It is used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.
Implementation: kafka-logger
Apache has been providing support for Apache Kafka since version 1.2 with the kafka-logger
plugin release. kafka-logger
has been enhanced several times since then to provide very mature and complete functionality. It supports pushing API request logs, request bodies, and response bodies, to a Kafka cluster in JSON format.
When using kafka-logger
, users can send a wide range of data and customize the format of the logs sent. kafka-logger
supports sending logs in a packaged manner in a batch or for automatic retries.
How to use
Step 1: Start Kafka Cluster
This article only demonstrates one way to start the cluster. Please refer to the official documentation for details of other ways to start the cluster.
# Start a cluster with 1 ZooKeeper node and 3 kafka nodes using docker-compose# At the same time, an EFAK is started for data monitoring.version: '3'services: zookeeper: image: confluentinc/cp-zookeeper:6.2.1 hostname: zookeeper ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_SERVER_ID: 1 ZOOKEEPER_SERVERS: zookeeper:2888:3888 kafka1: image: confluentinc/cp-kafka:6.2.1 hostname: kafka1 ports: - "9092:9092" environment: KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" KAFKA_BROKER_ID: 1 KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" depends_on: - zookeeper kafka2: image: confluentinc/cp-kafka:6.2.1 hostname: kafka2 ports: - "9093:9093" environment: KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19093,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" KAFKA_BROKER_ID: 2 KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" depends_on: - zookeeper kafka3: image: confluentinc/cp-kafka:6.2.1 hostname: kafka3 ports: - "9094:9094" environment: KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19094,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" KAFKA_BROKER_ID: 3 KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" depends_on: - zookeeper efak: image: nickzurich/kafka-eagle:2.0.9 hostname: efak ports: - "8048:8048" depends_on: - kafka1 - kafka2 - kafka3
Step 2: Create Topic
Next we create the test Topic
to collect logs.
Step 3: Create Routing and Enable Plugin
The following commands allow you to create routes and enable the kafka-logger
plugin.
curl -XPUT 'http://127.0.0.1:9080//admin/routes/r1' \--header 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' \--header 'Content-Type: application/json' \--data-raw '{ "uri": "/*", "plugins": { "kafka-logger": { "batch_max_size": 1, "broker_list": { "127.0.0.1": 9092 }, "disable": false,<span class="token-line" style="color: rgb(57, 58,