integrate with Kafka for real-time log monitoring

admin 1 2025-01-13 编辑

 integrate with Kafka for real-time log monitoring

This article describes how to use the kafka-logger plugin with . Wiht enhancements, the plugin now has mature and complete functions.

Apache Kafka is an open source stream processing platform managed by Apache, written in Scala and Java. Apache Kafka provides uniformed, high-throughput, low-latency functionality for real-time data processing.

Apache Kafka's persistence layer is essentially a "massive publish/subscribe message queue following a distributed transaction logging architecture," making it valuable as an enterprise-class infrastructure for processing streaming data. It is used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

Implementation: kafka-logger​

Apache has been providing support for Apache Kafka since version 1.2 with the kafka-logger plugin release. kafka-logger has been enhanced several times since then to provide very mature and complete functionality. It supports pushing API request logs, request bodies, and response bodies, to a Kafka cluster in JSON format.

When using kafka-logger, users can send a wide range of data and customize the format of the logs sent. kafka-logger supports sending logs in a packaged manner in a batch or for automatic retries.

How to use​

Step 1: Start Kafka Cluster​

This article only demonstrates one way to start the cluster. Please refer to the official documentation for details of other ways to start the cluster.

# Start a cluster with 1 ZooKeeper node and 3 kafka nodes using docker-compose# At the same time, an EFAK is started for data monitoring.version: '3'services:  zookeeper:    image: confluentinc/cp-zookeeper:6.2.1    hostname: zookeeper    ports:      - "2181:2181"    environment:      ZOOKEEPER_CLIENT_PORT: 2181      ZOOKEEPER_SERVER_ID: 1      ZOOKEEPER_SERVERS: zookeeper:2888:3888  kafka1:    image: confluentinc/cp-kafka:6.2.1    hostname: kafka1    ports:      - "9092:9092"    environment:      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"      KAFKA_BROKER_ID: 1      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"    depends_on:      - zookeeper  kafka2:    image: confluentinc/cp-kafka:6.2.1    hostname: kafka2    ports:      - "9093:9093"    environment:      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19093,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"      KAFKA_BROKER_ID: 2      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"    depends_on:      - zookeeper  kafka3:    image: confluentinc/cp-kafka:6.2.1    hostname: kafka3    ports:      - "9094:9094"    environment:      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19094,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"      KAFKA_BROKER_ID: 3      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"    depends_on:      - zookeeper  efak:    image: nickzurich/kafka-eagle:2.0.9    hostname: efak    ports:      - "8048:8048"    depends_on:      - kafka1      - kafka2      - kafka3

Step 2: Create Topic​

Next we create the test Topic to collect logs.

Step 3: Create Routing and Enable Plugin​

The following commands allow you to create routes and enable the kafka-logger plugin.

curl -XPUT 'http://127.0.0.1:9080//admin/routes/r1' \--header 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' \--header 'Content-Type: application/json' \--data-raw '{    "uri": "/*",    "plugins": {        "kafka-logger": {            "batch_max_size": 1,            "broker_list": {                "127.0.0.1": 9092            },            "disable": false,<span class="token-line" style="color: rgb(57, 58,

integrate with Kafka for real-time log monitoring

上一篇: Understanding the Significance of 3.4 as a Root in Mathematics
下一篇: API versioning
相关文章