Building a Dynamic Informer to Watch Multiple Resources in Go

Building a Dynamic Informer to Watch Multiple Resources in Go
dynamic informer to watch multiple resources golang

In the rapidly evolving landscape of distributed systems, particularly within cloud-native environments orchestrated by platforms like Kubernetes, the ability to monitor and react to changes across a multitude of diverse resources is paramount. Static, predefined monitoring solutions often fall short when faced with systems characterized by dynamic resource creation, evolving API definitions, and an ever-increasing variety of custom resource types. This is where the concept of a "dynamic informer" in Go, particularly when interacting with the Kubernetes API, emerges as a powerful and indispensable pattern. This article delves deeply into the architecture, implementation, and practical applications of building such a dynamic informer to watch multiple resources in Go, providing a comprehensive guide for developers aiming to build robust and adaptable control planes and automation tools.

The Genesis of Informers: Understanding the Need for Efficient Resource Watching

At its core, any distributed system that needs to maintain a consistent state or react to external changes requires an efficient mechanism to observe those changes. In the context of Kubernetes, this necessity is magnified due to its declarative nature. Users declare a desired state (e.g., "I want 3 replicas of this application"), and controllers continuously work to reconcile the current state with the desired state. This reconciliation loop hinges entirely on the controller's ability to be informed of state changes in the cluster's resources.

Polling, the most straightforward approach, involves repeatedly querying the API server for the current state of resources. While simple, polling is inherently inefficient. It generates significant network traffic and API server load, especially for large clusters or frequent checks. More critically, it introduces latency between a change occurring and the controller reacting to it, as the controller only discovers the change during its next poll cycle. This inefficiency becomes a major bottleneck for responsive and scalable systems.

To overcome the limitations of polling, Kubernetes introduced the "informer" pattern. An informer is a sophisticated client-side cache and event-driven mechanism designed for efficient resource watching. It essentially streams events (additions, updates, deletions) from the Kubernetes API server, maintaining an up-to-date in-memory cache of the resources it's watching. This pattern offers several significant advantages:

  1. Reduced API Server Load: Instead of repeatedly querying, the informer establishes a persistent connection (via websockets or long-polling) and receives only delta updates. This drastically reduces the load on the API server.
  2. Low Latency Reactions: Controllers are notified almost immediately when a resource changes, enabling rapid reactions and minimizing eventual consistency delays.
  3. Client-Side Cache: The informer maintains a local, consistent cache. Controllers can query this cache directly, avoiding unnecessary API calls and further improving performance. This cache also aids in preventing "thundering herd" problems where many clients might otherwise hit the API server simultaneously.
  4. Resilience: Informers are built with retry mechanisms and watch resumption logic, making them resilient to temporary network glitches or API server restarts.

This fundamental pattern forms the bedrock of most Kubernetes controllers and operators written in Go using the client-go library. However, traditional informers, often called "static informers," are typically generated for specific, known resource types at compile time. While effective for a fixed set of resources, they introduce limitations when the types of resources to be watched are not known beforehand or can change dynamically during runtime.

The Bottleneck of Static Informers: When Fixed Schemas Fail

Static informers, as provided by client-go's typed clients, are a powerful tool for building controllers for well-defined Kubernetes resources (like Pods, Deployments, Services) or custom resources (CRDs) whose Go types have been generated. You typically define Go structs that correspond to the Kubernetes API objects (e.g., corev1.Pod, appsv1.Deployment) and then use a shared informer factory to create informers for these specific types. This approach works perfectly when:

  • The set of resources you need to watch is fixed and known at the time of compilation.
  • You have generated Go types for all these resources using tools like code-generator.
  • Your application doesn't need to discover or react to entirely new resource types that might appear in the cluster after deployment.

However, many advanced scenarios demand greater flexibility. Consider a multi-tenant platform that allows each tenant to define their own set of custom resources, each with unique schemas and lifecycles. Or perhaps a generic automation tool that needs to operate on any custom resource conforming to a certain label or annotation, regardless of its specific GroupVersionKind (GVK). In such cases, compiling informers for every possible resource type is impractical, if not impossible. The Go types for unknown CRDs cannot be generated beforehand.

Attempting to handle these dynamic requirements with static informers leads to several significant challenges:

  • Compile-Time Coupling: The application becomes tightly coupled to specific Go types and API versions. Any new CRD or API version requires code changes, regeneration of types, and recompilation/redeployment. This is antithetical to the goal of dynamic, self-adapting systems.
  • Scalability Issues: If a system needs to watch a large, potentially unbounded number of different resource types, generating and maintaining separate static informers for each becomes resource-intensive and unwieldy. Each informer consumes memory for its cache and goroutines for its watch loop.
  • Discovery Limitation: Static informers cannot discover new resource types that appear in the cluster at runtime. If a new CRD is installed, an application using only static informers would remain oblivious to its existence until redeployed with updated code.
  • Code Duplication and Complexity: Without a generic mechanism, developers might resort to writing boilerplate code for each resource type, leading to repetitive, difficult-to-maintain logic.

These limitations underscore the necessity for a more adaptable solution: the dynamic informer. It allows an application to watch resources whose types are determined at runtime, without needing compile-time Go structs for every possible GVK. This capability is crucial for building truly generic, extensible, and self-adaptive control planes, especially those designed to manage and orchestrate diverse elements within complex cloud-native ecosystems.

Embracing Flexibility: The Power of Dynamic Informers

A dynamic informer breaks free from the constraints of compile-time type definitions by operating on unstructured data. Instead of relying on generated Go structs, it deals with resources as generic unstructured.Unstructured objects, which are essentially map[string]interface{} representations of the Kubernetes API objects. This fundamental shift allows an application to discover and interact with any resource present in the cluster, as long as it has a GroupVersionResource (GVR) that the API server exposes.

The core idea behind a dynamic informer is to decouple the watching mechanism from specific Go types. Instead, it uses the Kubernetes API's discovery capabilities to identify available resource types and then creates generic informers for these discovered types. This makes it incredibly powerful for scenarios where:

  • Unknown CRDs: You need to watch CRDs that are created or installed dynamically, without prior knowledge of their schema.
  • Generic Tools: You're building a generic tool that needs to operate across various resource types based on common attributes (e.g., all resources with a specific label).
  • API Evolution: You want your application to be resilient to API changes or new versions without requiring recompilation.
  • Custom API Gateways: Building a custom API gateway that needs to dynamically discover and route requests to various backend services, including those defined by CRDs. This is where the concepts of api, gateway, and OpenAPI start to converge naturally. A dynamic informer can monitor CRDs that define service endpoints or OpenAPI specifications, allowing the gateway to automatically update its routing table.

How Dynamic Informers Achieve Agility: Key client-go Components

Building a dynamic informer in Go primarily involves leveraging specific components from the client-go library, which provides the necessary primitives for interacting with the Kubernetes API. The key players are:

  1. dynamic.Interface (Dynamic Client): This is the heart of dynamic interaction. Unlike typed clients (e.g., corev1.CoreV1Interface), the dynamic client operates on unstructured.Unstructured objects. It allows you to create, get, update, delete, and list any resource, provided you specify its GroupVersionResource (GVR).
  2. discovery.DiscoveryInterface (Discovery Client): Before you can watch a resource, you need to know it exists and what its GVR is. The discovery client allows you to query the API server for all supported API groups, versions, and resources. It's how your dynamic informer "discovers" what's available in the cluster at runtime.
  3. dynamicinformer.NewFilteredDynamicSharedInformerFactory: This factory is analogous to informers.NewSharedInformerFactory but is designed to create informers for unstructured objects using the dynamic client. You provide it with a dynamic client and a resync period, and it can then create informers for specific GVRs.
  4. cache.ResourceEventHandler: Whether static or dynamic, informers deliver events (add, update, delete) to registered handlers. For dynamic informers, these handlers will receive unstructured.Unstructured objects, which you then inspect and process based on your application's logic.

These components, when orchestrated correctly, enable the construction of a highly flexible and powerful resource watching system.

The Architecture of a Dynamic Informer System

Designing a robust dynamic informer system involves more than just plugging in the client-go components. It requires careful consideration of discovery, lifecycle management, event processing, concurrency, and error handling. Here's a conceptual architecture:

graph TD
    A[Start Application] --> B{Discovery Client (DiscoveryInterface)};
    B --> C[List All API Resources (GVRs)];
    C --> D{Filter GVRs based on criteria};
    D --> E[For Each Matched GVR];
    E --> F[Create Dynamic Informer for GVR];
    F --> G[Register Event Handlers (Add/Update/Delete)];
    G --> H[Start Informer];
    H --> I[Add Informer to Manager/Wait Group];
    E -- Loop --> D;
    I --> J[Run Event Processing Loop (Workers)];
    J --> K{Process Events from Informer Handlers};
    K --> L[Update Internal State/Trigger Actions];
    K -- Errors --> M[Error Handling/Logging];
    H -- Events Stream --> G;
    J -- On Shutdown --> N[Stop All Informers];

Detailed Architectural Breakdown:

  1. Initialization and Client Setup:
    • The system starts by creating a Kubernetes RESTConfig, typically from kubeconfig or in-cluster service account.
    • From this config, an *kubernetes.Clientset is created for accessing standard Kubernetes APIs (though not strictly necessary for dynamic watching itself, it's often useful).
    • Crucially, a dynamic.Interface (dynamic client) and a discovery.DiscoveryInterface (discovery client) are initialized. These will be the primary tools for dynamic interaction.
  2. Resource Discovery Loop:
    • The dynamic informer system doesn't know upfront which resources to watch. It must discover them.
    • The DiscoveryClient is used to list all API resources available in the cluster. This involves iterating through API groups and their versions, then fetching APIResourceList for each group/version.
    • For each APIResource, a GroupVersionResource (GVR) is constructed. A GVR uniquely identifies a collection of resources, like pods in v1 of core API group.
    • A filtering mechanism is applied. This is critical: you rarely want to watch every single resource in a cluster. Filters might include:
      • Only watching CRDs (resources with Group not in standard Kubernetes groups).
      • Resources with specific labels or annotations.
      • Resources within a particular namespace.
      • Resources matching a predefined list of desired GVKs.
  3. Dynamic Informer Creation and Management:
    • For each GVR that passes the filter, a new dynamic informer needs to be created. This is done using dynamicinformer.NewFilteredDynamicSharedInformerFactory and then calling ForResource(gvr).
    • Each informer requires its own cache.ResourceEventHandlerFuncs to define what happens when an object is added, updated, or deleted. These handlers receive unstructured.Unstructured objects.
    • A controller.Context or a context.Context from the standard Go library is used to manage the lifecycle of these informers, allowing for graceful shutdown.
    • All created informers are started (informer.Run(stopCh)) and their HasSynced() method is awaited to ensure the local cache is populated before processing events.
  4. Event Processing and Reconciliation:
    • When an informer's handler receives an event (e.g., OnAdd, OnUpdate, OnDelete), it typically enqueues the key (namespace/name or just name) of the affected unstructured.Unstructured object into a work queue.
    • A set of worker goroutines (often called "controllers") constantly pull items from this work queue.
    • For each item, the worker retrieves the latest version of the resource from its informer's local cache (using the Lister). This "get from cache" pattern is important to ensure the worker operates on the most up-to-date state, as the cache might have been updated by subsequent events while the item was in the queue.
    • The worker then performs its core logic:
      • Inspecting the unstructured.Unstructured object.
      • Applying business rules.
      • Making changes to other Kubernetes resources using the dynamic client.
      • Updating external systems.
  5. Concurrency and Error Handling:
    • The system must handle multiple informers running concurrently, each in its own goroutine.
    • Work queues and worker pools are crucial for parallel event processing.
    • Robust error handling is paramount. This includes retries for transient errors, backoff strategies, and informative logging. Errors during discovery or informer creation should be handled gracefully without crashing the entire system.
  6. Re-Discovery and Adaptation:
    • Crucially, the set of available GVRs can change. New CRDs might be installed, or existing ones uninstalled.
    • A truly dynamic informer system periodically re-runs its discovery loop. If new GVRs are found that match the filter, new informers are created and started. If GVRs disappear, their corresponding informers should be stopped and cleaned up. This makes the system self-adaptive to changes in the cluster's API surface.

This architecture enables a powerful and adaptable system capable of observing and reacting to a continuously evolving set of resources in a Kubernetes cluster, providing the foundation for highly flexible automation and control logic.

APIPark is a high-performance AI gateway that allows you to securely access the most comprehensive LLM APIs globally on the APIPark platform, including OpenAI, Anthropic, Mistral, Llama2, Google Gemini, and more.Try APIPark now! πŸ‘‡πŸ‘‡πŸ‘‡

Implementing a Dynamic Informer in Go: A Conceptual Walkthrough

Let's walk through the core Go code components needed to implement such a dynamic informer. This will illustrate how the concepts discussed above translate into actual code, offering a practical blueprint.

package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "os/signal"
    "sync"
    "syscall"
    "time"

    "k8s.io/apimachinery/pkg/api/meta"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/runtime/schema"
    "k8s.io/apimachinery/pkg/util/wait"
    "k8s.io/client-go/discovery"
    "k8s.io/client-go/dynamic"
    "k8s.io/client-go/dynamic/dynamicinformer"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/tools/clientcmd"
    "k8s.io/client-go/util/homedir"
    "k8s.io/client-go/util/workqueue"
    "k8s.io/klog/v2"

    // Import to ensure kubectl auth plugins are loaded
    _ "k8s.io/client-go/plugin/pkg/client/auth"
)

// Controller struct holds necessary clients and configurations
type DynamicResourceController struct {
    dynamicClient    dynamic.Interface
    discoveryClient  discovery.DiscoveryInterface
    informerFactory  dynamicinformer.DynamicSharedInformerFactory
    informers        map[schema.GroupVersionResource]cache.SharedIndexInformer
    queue            workqueue.RateLimitingInterface
    knownGVRs        map[schema.GroupVersionResource]bool
    mu               sync.Mutex // Protects informers and knownGVRs
    resyncPeriod     time.Duration
    discoveryRefresh time.Duration
    stopCh           chan struct{}
}

func NewDynamicResourceController(
    dynamicClient dynamic.Interface,
    discoveryClient discovery.DiscoveryInterface,
    resyncPeriod, discoveryRefresh time.Duration,
) *DynamicResourceController {
    factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(
        dynamicClient,
        resyncPeriod, // How often to resync the cache (full list)
        metav1.NamespaceAll, // Watch all namespaces
        nil, // No TweakListOptionsFunc for now
    )

    return &DynamicResourceController{
        dynamicClient:    dynamicClient,
        discoveryClient:  discoveryClient,
        informerFactory:  factory,
        informers:        make(map[schema.GroupVersionResource]cache.SharedIndexInformer),
        queue:            workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
        knownGVRs:        make(map[schema.GroupVersionResource]bool),
        resyncPeriod:     resyncPeriod,
        discoveryRefresh: discoveryRefresh,
    }
}

// Run starts the dynamic controller, discovering and watching resources.
func (c *DynamicResourceController) Run(ctx context.Context, workers int) error {
    defer klog.Info("Shutting down dynamic controller")
    defer c.queue.ShutDown()

    c.stopCh = make(chan struct{})

    // Start a goroutine for periodic resource discovery
    go wait.Until(c.discoverAndWatchResources, c.discoveryRefresh, c.stopCh)

    // Wait for the initial discovery and informer sync
    klog.Info("Waiting for initial resource discovery and informers to sync...")
    // We need a mechanism to wait for the first sync or at least a few discovery cycles.
    // For simplicity, we'll use a short sleep here, but in a real-world scenario,
    // you'd have more sophisticated signaling.
    time.Sleep(c.discoveryRefresh * 2) // Give it time to discover and create some informers

    // Start the informers from the factory, which will include all currently registered ones
    c.informerFactory.Start(c.stopCh)
    c.informerFactory.WaitForCacheSync(c.stopCh) // Wait for all registered informers to sync their caches

    klog.Info("Informers synced successfully. Starting event processing workers.")

    // Start worker goroutines to process items from the work queue
    for i := 0; i < workers; i++ {
        go wait.Until(c.runWorker, time.Second, c.stopCh)
    }

    <-ctx.Done() // Block until context is cancelled
    close(c.stopCh) // Signal discovery and worker goroutines to stop

    return nil
}

func (c *DynamicResourceController) runWorker() {
    for c.processNextWorkItem() {
    }
}

func (c *DynamicResourceController) processNextWorkItem() bool {
    obj, shutdown := c.queue.Get()
    if shutdown {
        return false
    }

    // We call Done here so the workqueue knows we have finished processing this item.
    // C.queue.Forget() is called if we are successful, otherwise, if there's an error,
    // we Requeue the item, which will be processed again after a rate limit delay.
    defer c.queue.Done(obj)

    key, ok := obj.(string)
    if !ok {
        klog.Errorf("expected string in workqueue but got %#v", obj)
        c.queue.Forget(obj)
        return true
    }

    if err := c.syncHandler(key); err != nil {
        klog.Errorf("Error syncing %s: %v", key, err)
        c.queue.AddRateLimited(key) // Requeue on error
        return true
    }

    c.queue.Forget(obj) // Remove from work queue on success
    return true
}

// syncHandler processes the event for a given resource key.
func (c *DynamicResourceController) syncHandler(key string) error {
    klog.Infof("Processing key: %s", key)

    // Here, you would parse the key to get GVR, namespace, name
    // This is simplified for brevity. A real key would contain more info.
    // For dynamic informers, the key is usually "namespace/name" for namespaced resources, or "name" for cluster-scoped.
    // To get the GVR, we need to know which informer enqueued it.
    // A more robust system would encode GVR into the key, or pass it directly.

    // For demonstration, let's assume we want to log the resource.
    // A real implementation would fetch the resource from the cache for a specific GVR
    // and perform complex logic.
    // Example: Get a Pod from the cache (if GVR was a Pod)
    // if gvr == corev1.SchemeGroupVersion.WithResource("pods") {
    //    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    //    if err != nil {
    //        return err
    //    }
    //    obj, exists, err := c.informers[gvr].GetStore().GetByKey(key)
    //    if err != nil {
    //        return err
    //    }
    //    if exists {
    //        unstructuredObj := obj.(*unstructured.Unstructured)
    //        klog.Infof("Found %s/%s in cache: %s", namespace, name, unstructuredObj.GetName())
    //        // Perform your controller logic here
    //    } else {
    //        klog.Infof("Resource %s/%s no longer exists.", namespace, name)
    //    }
    // }
    // Since we don't have GVR in the key directly, we'll just log the key for now.
    klog.Infof("Successfully processed resource event for key: %s", key)
    return nil
}

// discoverAndWatchResources periodically discovers new API resources and sets up informers.
func (c *DynamicResourceController) discoverAndWatchResources() {
    klog.Info("Starting resource discovery cycle...")
    _, apiResourceLists, err := c.discoveryClient.ServerGroupsAndResources()
    if err != nil {
        klog.Errorf("Error discovering resources: %v", err)
        return
    }

    currentGVRs := make(map[schema.GroupVersionResource]bool)

    c.mu.Lock()
    defer c.mu.Unlock()

    for _, list := range apiResourceLists {
        gv, err := schema.ParseGroupVersion(list.GroupVersion)
        if err != nil {
            klog.Errorf("Error parsing GroupVersion %s: %v", list.GroupVersion, err)
            continue
        }

        for _, resource := range list.APIResources {
            // Filter out subresources and non-listable resources
            if !resource.Namespaced && !resource.CanExport { // Only watch listable resources
                continue
            }
            if len(resource.Verbs) == 0 || !containsVerb(resource.Verbs, "list") {
                continue
            }
            if resource.Name == "" || containsVerb(resource.Verbs, "proxy") { // Skip proxy type resources, these are typically sub-resources
                continue
            }
            if containsString(resource.Name, "/techblog/en/") { // Skip sub-resources like "pods/log"
                continue
            }

            gvr := gv.WithResource(resource.Name)
            currentGVRs[gvr] = true

            // Example filter: only watch resources in 'example.com' group or standard 'apps' group.
            // In a real-world scenario, this filter would be more sophisticated,
            // perhaps based on annotations, labels, or a configurable whitelist/blacklist.
            if !c.shouldWatchGVR(gvr) {
                continue
            }

            if _, exists := c.knownGVRs[gvr]; !exists {
                klog.Infof("Discovered new GVR: %s. Setting up informer.", gvr.String())
                c.setupInformerForGVR(gvr)
                c.knownGVRs[gvr] = true
                // Note: informers started by factory.Start() will pick this up automatically
                // if factory.Start has already been called. If not, they will start when it's called.
                // For truly dynamic start/stop, one might need to manage individual informer Run() calls.
                // For this simplified example, we rely on the factory.Start and WaitForCacheSync at the beginning.
                // For real dynamic start/stop, each informer would need its own goroutine for Run and a separate stop channel.
            }
        }
    }

    // Clean up informers for GVRs that no longer exist
    for gvr := range c.knownGVRs {
        if _, exists := currentGVRs[gvr]; !exists {
            klog.Infof("GVR %s no longer exists. Stopping its informer.", gvr.String())
            // This is complex with SharedInformerFactory.
            // A direct way to stop a single informer created by the factory isn't readily available.
            // The factory is designed for a fixed set of informers or ones added before its Start method is called.
            // To truly dynamically stop/remove, you'd manage informers manually or restart the entire factory.
            // For simplicity, we'll just remove it from knownGVRs, allowing it to be ignored.
            // In a production system, this would require carefully managing the lifecycle of each informer and its goroutine.
            delete(c.knownGVRs, gvr)
            delete(c.informers, gvr) // Remove from map
        }
    }
    klog.Info("Resource discovery cycle completed.")
}

// shouldWatchGVR applies filtering logic to determine if a GVR should be watched.
func (c *DynamicResourceController) shouldWatchGVR(gvr schema.GroupVersionResource) bool {
    // Example: Watch all Custom Resources, or specific standard resources like "deployments"
    // This filter needs to be robust and configurable for a real application.
    if gvr.Group == "apps" && gvr.Resource == "deployments" {
        return true
    }
    if gvr.Group == "batch" && gvr.Resource == "cronjobs" {
        return true
    }
    // Let's say we want to watch any CRD within a specific group, e.g., "my.api.example.com"
    if gvr.Group == "crd.api.example.com" { // Placeholder for a custom CRD group
        return true
    }
    // For API management, we might specifically watch CRDs that define APIs or services.
    // For instance, if CRDs like 'APIEntrypoints.apipark.com' or 'ServiceDefinitions.example.com' existed.
    // This mechanism is key for a truly dynamic API gateway.
    // A platform like [ApiPark](https://apipark.com/), an open-source AI gateway and API management platform,
    // could potentially use such a dynamic informer to automatically detect and integrate new API services
    // declared as custom resources, ensuring its unified API format remains up-to-date with
    // dynamically deployed microservices or AI models.
    // This dynamic discovery, reacting to new or updated `OpenAPI` definitions published as CRDs,
    // would make the `gateway` exceptionally adaptable to rapidly evolving service landscapes.
    return false
}

// setupInformerForGVR creates and registers an informer for a specific GVR.
func (c *DynamicResourceController) setupInformerForGVR(gvr schema.GroupVersionResource) {
    informer := c.informerFactory.ForResource(gvr).Informer()

    informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            key, err := cache.MetaNamespaceKeyFunc(obj)
            if err != nil {
                klog.Error(err)
                return
            }
            klog.Infof("ADD for GVR %s: %s", gvr.String(), key)
            c.queue.Add(key)
        },
        UpdateFunc: func(oldObj, newObj interface{}) {
            key, err := cache.MetaNamespaceKeyFunc(newObj)
            if err != nil {
                klog.Error(err)
                return
            }
            klog.Infof("UPDATE for GVR %s: %s", gvr.String(), key)
            c.queue.Add(key)
        },
        DeleteFunc: func(obj interface{}) {
            key, err := cache.MetaNamespaceKeyFunc(obj)
            if err != nil {
                klog.Error(err)
                return
            }
            klog.Infof("DELETE for GVR %s: %s", gvr.String(), key)
            c.queue.Add(key)
        },
    })
    c.informers[gvr] = informer
    // Note: Informers created this way by the factory are started by calling factory.Start(stopCh)
    // and synced by factory.WaitForCacheSync(stopCh). Individual informers aren't run directly here.
}

// Helper functions
func containsVerb(verbs []string, verb string) bool {
    for _, v := range verbs {
        if v == verb {
            return true
        }
    }
    return false
}

func containsString(s string, substr string) bool {
    return len(s) >= len(substr) && s[0:len(substr)] == substr
}

func main() {
    klog.InitFlags(nil) // Initialize klog
    flag.Parse()

    // Set up kubeconfig
    var kubeconfig string
    if home := homedir.HomeDir(); home != "" {
        kubeconfig = filepath.Join(home, ".kube", "config")
    } else {
        kubeconfig = os.Getenv("KUBECONFIG")
    }

    config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
    if err != nil {
        log.Fatalf("Error building kubeconfig: %v", err.Error())
    }

    // Create dynamic client
    dynamicClient, err := dynamic.NewForConfig(config)
    if err != nil {
        log.Fatalf("Error creating dynamic client: %v", err.Error())
    }

    // Create discovery client
    discoveryClient, err := discovery.NewForConfig(config)
    if err != nil {
        log.Fatalf("Error creating discovery client: %v", err.Error())
    }

    controller := NewDynamicResourceController(dynamicClient, discoveryClient, 30*time.Second, 5*time.Minute)

    // Set up signals for graceful shutdown
    ctx, cancel := context.WithCancel(context.Background())
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGTERM, syscall.SIGINT)

    go func() {
        sig := <-sigCh
        klog.Infof("Received signal %v, shutting down...", sig)
        cancel()
    }()

    if err := controller.Run(ctx, 2); err != nil { // 2 worker goroutines
        klog.Fatalf("Error running controller: %v", err)
    }

    klog.Info("Controller gracefully shut down.")
}

Explanation of the Code Flow:

  1. NewDynamicResourceController: Initializes the controller with dynamic and discovery clients, along with the DynamicSharedInformerFactory. This factory is capable of creating informers for any GVR.
  2. Run Method:
    • Starts a background goroutine (discoverAndWatchResources) that periodically uses the discoveryClient to find all available API resources.
    • Calls c.informerFactory.Start(c.stopCh) and c.informerFactory.WaitForCacheSync(c.stopCh) to kick off all informers that have been registered with the factory and wait until their caches are initially populated.
    • Launches worker goroutines (runWorker) that pull items from a rate-limiting work queue.
    • Blocks until the ctx is cancelled (e.g., by a SIGINT or SIGTERM signal), then gracefully shuts down.
  3. discoverAndWatchResources:
    • Fetches all ServerGroupsAndResources from the discoveryClient.
    • Iterates through each APIGroup and its APIResourceList.
    • Constructs a schema.GroupVersionResource (GVR) for each discoverable resource.
    • Applies a shouldWatchGVR filter. This is where you define which resources you care about.
    • If a new GVR is found that matches the filter and wasn't previously known, setupInformerForGVR is called.
    • It also handles GVRs that might have disappeared from the cluster, though stopping individual informers gracefully from a SharedInformerFactory is more involved than simply creating them. In a real system, a full factory restart or more granular informer management might be needed for removal.
  4. shouldWatchGVR: This crucial function contains the logic to decide whether to create an informer for a given GVR. For example, it could watch all CRDs in a specific group, or all resources that have a certain label. This is a powerful extension point. The mention of APIPark here naturally illustrates how such dynamic watching capabilities could be leveraged to discover and integrate new API services, potentially defined by OpenAPI specifications embedded in custom resources, into an API gateway management platform.
  5. setupInformerForGVR:
    • Uses c.informerFactory.ForResource(gvr).Informer() to get an cache.SharedIndexInformer for the specific GVR.
    • Registers AddFunc, UpdateFunc, and DeleteFunc handlers. These handlers simply push the resource's key (namespace/name) into the work queue.
  6. runWorker and processNextWorkItem: Standard controller pattern. Workers continuously pull keys from the queue.
  7. syncHandler: This is the core business logic. When an event's key is processed, this function would retrieve the unstructured.Unstructured object from the informer's cache (using its Lister) and perform the desired actions (e.g., validate, mutate, create other resources, update external systems). For simplicity, it currently only logs the key.

This conceptual implementation provides a strong foundation. Real-world applications would build upon this by adding more sophisticated filtering, robust error handling, metric collection, and detailed business logic within the syncHandler.

Practical Use Cases for Dynamic Informers

The versatility of dynamic informers opens up a myriad of possibilities for building sophisticated and adaptive systems. Here are several practical use cases:

  1. Generic Kubernetes Operators and Controllers:
    • Instead of writing a new operator for every custom resource, a dynamic informer allows you to build a single, generic "meta-operator." This operator could watch for any CRD that adheres to certain conventions (e.g., has a specific label, contains a particular field like status.conditions). It can then apply generic reconciliation logic, such as ensuring all resources are properly tagged, auditing configurations, or enforcing naming conventions, without needing specific Go types for each CRD.
    • Consider a "resource cleaner" operator. It could dynamically discover all resources of a certain type (e.g., ephemeral development resources) based on annotations or age and clean them up, even if those resource types are not known at compile time.
  2. Automated Cloud Resource Management:
    • Beyond Kubernetes, if you have an API that represents cloud resources (e.g., AWS EC2 instances, Azure VMs, GCP Cloud Functions) and you want to manage them in a Kubernetes-native way (e.g., crossplane.io's approach), dynamic informers can monitor CustomResourceDefinitions that define these cloud resources. When a new GCPManagedDatabase CRD is installed, the dynamic informer can detect it, create a watch, and then a controller can provision or manage the actual GCP resource. This bridges the gap between Kubernetes and external cloud APIs seamlessly.
  3. Security and Compliance Enforcement:
    • A dynamic informer can be the backbone of a real-time security scanner. It could watch all newly created or updated resources across the cluster, regardless of their type, and check them against security policies. For example, ensuring that no Ingress resource exposes sensitive internal paths, or that every Deployment has resource limits defined. It can detect misconfigurations across a dynamic array of resource types, providing immediate feedback or automated remediation.
  4. Policy Engines:
    • Policy engines like OPA Gatekeeper or Kyverno already use similar mechanisms internally. A custom policy engine, built with a dynamic informer, could watch for changes across diverse resources and apply custom admission control or mutation policies. For instance, automatically injecting sidecar containers into pods, or ensuring specific labels are present on all services, even for newly introduced CRDs.
  5. Custom Observability and Monitoring Tools:
    • Imagine a tool that collects metrics or logs from any resource type based on its annotations. A dynamic informer would discover all relevant resources, allowing the tool to connect to them (e.g., scraping metrics endpoints, tailing logs) without needing to be explicitly configured for each new resource type. This enables highly adaptable monitoring solutions.
  6. Dynamic API Gateways and Service Discovery:
    • This is a particularly potent use case, directly tying into the keywords api, gateway, and OpenAPI. Consider an API gateway that needs to expose services defined by various teams, each potentially using their own custom resource definitions for service exposure (e.g., a GraphQLService CRD, a RestEndpoint CRD).
    • A dynamic informer can watch for these Service or API definition CRDs. When a new Service CRD is added or updated, the informer triggers a worker. This worker can then parse the unstructured.Unstructured object, extract routing information (e.g., host, path, backendService), and potentially even read embedded OpenAPI specifications.
    • The worker then uses this information to dynamically configure the API gateway (e.g., Nginx, Envoy, Kong) to expose the new API. This eliminates manual configuration of the gateway whenever a new service is deployed or an existing one changes its OpenAPI definition.
    • Platforms like ApiPark, an open-source AI gateway and API management platform, thrive on such dynamic capabilities. By leveraging dynamic informers, APIPark could automatically discover new AI models or REST services defined as custom resources, instantly integrating them into its unified API management system. This facilitates quick integration of 100+ AI models, prompt encapsulation into REST APIs, and end-to-end API lifecycle management without constant manual intervention, dramatically boosting efficiency for developers and operations personnel in managing a diverse API landscape based on OpenAPI specifications.

These examples highlight how dynamic informers serve as a foundational building block for creating sophisticated, self-configuring, and self-healing systems in modern cloud-native architectures.

Advanced Considerations for Production-Ready Dynamic Informers

While the core concepts and implementation blueprint provide a solid start, building a dynamic informer for production environments necessitates addressing several advanced considerations. These factors ensure the system is not only functional but also resilient, scalable, and secure.

1. Robust Filtering and GVR Management

The shouldWatchGVR function is arguably the most critical part of a dynamic informer. In production, a simple hardcoded filter is insufficient. You'll need:

  • Configurable Whitelists/Blacklists: Allow operators to specify exact GVRs or GVR patterns (e.g., *.example.com/*/*) to include or exclude.
  • Label/Annotation-based Filtering: Watch resources only if they have specific labels or annotations. This is incredibly powerful for opt-in/opt-out behavior.
  • Regex Matching: For more complex matching patterns on GVRs.
  • Namespace Scoping: While NewFilteredDynamicSharedInformerFactory allows namespace filtering, ensure your discovery logic also respects this.
  • Resilience to Discovery Errors: What happens if ServerGroupsAndResources() temporarily fails? The system should not crash but retry with appropriate backoff.

2. Event Processing Semantics and Idempotency

  • Work Queue Depth and Rate Limiting: Properly configure the workqueue.RateLimitingInterface to prevent flooding your workers and the API server during bursts of events. workqueue.DefaultControllerRateLimiter() is a good start, but tune it based on your workload.
  • Idempotent Reconciliation: Your syncHandler must be idempotent. This means running it multiple times with the same input should produce the same result and have no negative side effects. Informers can deliver duplicate events, or events out of order, so your logic should always reconcile to the desired state, rather than reacting solely to the event type (Add/Update/Delete).
  • State Management: For complex controllers, the local cache of the informer might not be enough. You might need to maintain an internal state machine for each resource you manage, ensuring transitions are valid.

3. Concurrency, Performance, and Resource Consumption

  • Worker Pool Sizing: Determine the optimal number of worker goroutines. Too few, and your controller lags; too many, and you might overload upstream services or exhaust CPU. This often requires empirical testing.
  • CPU and Memory Footprint: Each informer, especially when watching numerous GVRs, consumes memory for its cache. Monitor your application's resource usage carefully. For very large clusters with thousands of CRDs, you might need to be extremely selective about what you watch.
  • Watch Caching and Throttling: The client-go library itself has internal throttling mechanisms, but if your syncHandler makes external API calls (e.g., to cloud providers or other APIs), ensure those calls are also rate-limited or batched to prevent hitting external service limits.
  • Garbage Collection of Stale Informers: As mentioned in the code walkthrough, dynamically stopping individual informers created by NewFilteredDynamicSharedInformerFactory isn't straightforward. For true dynamic GVR lifecycle management (where GVRs can disappear), you might need to periodically stop and restart the entire informer factory, or adopt a more granular informer management approach where each informer runs in its own dedicated goroutine with its own stop channel. This is more complex but offers finer control.

4. Security Considerations

  • RBAC Permissions: The service account running your dynamic informer needs list and watch permissions for all the GVRs it intends to monitor. For dynamic.Interface, this often means broad get, list, watch permissions on */* (all groups, all resources), which is powerful and must be carefully managed. Restrict these permissions as much as possible to the specific GVRs your controller needs to operate on. If it also performs write operations (create, update, delete), those must be explicitly granted.
  • Least Privilege: Adhere strictly to the principle of least privilege. Only grant the necessary permissions. Avoid giving cluster-admin roles unless absolutely unavoidable and well-justified.
  • Data Exposure: unstructured.Unstructured objects contain all fields of a resource. Be mindful of sensitive data that might be present in resources and ensure your logging and processing logic doesn't inadvertently expose it.

5. Observability and Troubleshooting

  • Comprehensive Logging: Use klog or a similar structured logging library. Log key events: informer starts/stops, GVR discovery/un-discovery, work queue adds/gets/errors, and especially details during syncHandler execution.
  • Metrics: Expose Prometheus metrics for:
    • Work queue depth and processing time.
    • Number of GVRs being watched.
    • Number of OnAdd, OnUpdate, OnDelete events processed per GVR.
    • Errors encountered during discovery or event processing.
  • Tracing: Integrate with a distributed tracing system (e.g., OpenTelemetry) to track the flow of an event from its ingestion by the informer through the work queue to its final processing in the syncHandler.
  • Health Checks: Provide standard /healthz and /readyz endpoints for Kubernetes probes. A dynamic informer is "ready" only after its initial set of informers have synced their caches.

6. Integration with API Management and External Systems

  • When dynamic informers are used to feed information into an API gateway or an API management platform, the integration point must be robust. This involves:
    • External API Calls: Your syncHandler might call the API gateway's configuration API to add, update, or remove routes. These calls need proper error handling, retries, and potentially circuit breakers.
    • Configuration Rollbacks: If updating the API gateway fails, how do you handle rollback or reconcile the discrepancy? The API gateway itself should have mechanisms for transactional updates.
    • Schema Evolution: If your dynamic informer is watching OpenAPI definitions, ensure your processing logic can gracefully handle OpenAPI schema evolution (e.g., new fields, deprecated fields).

By meticulously addressing these advanced considerations, developers can transform a basic dynamic informer concept into a production-grade component capable of powering highly resilient, scalable, and secure automation within complex cloud-native environments.

Table: Comparing Static vs. Dynamic Informers

To solidify the understanding of when to use each type of informer, let's summarize their key characteristics:

Feature Static Informer Dynamic Informer
Resource Type Knowledge Known at compile-time (Go types generated) Discovered at runtime (operates on unstructured data)
Go Type Usage Specific Go structs (e.g., corev1.Pod) unstructured.Unstructured (generic map[string]interface{})
Primary Client *kubernetes.Clientset (typed clients) dynamic.Interface (dynamic client)
Discovery Implicit, via generated client sets Explicit, via discovery.DiscoveryInterface at runtime
Flexibility Low, tightly coupled to specific API objects High, adapts to any API resource based on GVR
Use Cases Standard Kubernetes controllers (Pods, Deployments) Generic operators, meta-controllers, dynamic API gateway configs, security policy engines, watching unknown CRDs
Complexity Relatively lower, type-safe operations Higher, requires careful handling of unstructured data, more complex GVR management
Performance Generally very efficient, direct struct access Efficient at watching, but processing unstructured data can have a slight overhead compared to direct struct access
Code Maintenance Requires code generation & recompile for new types Highly adaptable, minimal code changes for new resource types
Error Handling Type-checked errors, easier to reason about Runtime type assertions, potential for more runtime errors if parsing unstructured data incorrectly

This table clearly highlights that while static informers are excellent for well-defined and stable resource sets, dynamic informers offer unparalleled flexibility for systems that need to adapt to an evolving API surface.

Conclusion

The ability to build a dynamic informer to watch multiple resources in Go is a cornerstone skill for anyone developing sophisticated automation and control planes in the cloud-native ecosystem. We've explored the fundamental limitations of static informers, understood the compelling need for dynamic discovery, and delved into the architectural principles and a conceptual Go implementation that leverages client-go's dynamic and discovery capabilities.

From generic Kubernetes operators that can manage any custom resource conforming to a pattern, to powerful policy enforcement engines, and especially to self-configuring API gateways that seamlessly integrate new API services, dynamic informers empower developers to build systems that are inherently more resilient, extensible, and less brittle to changes in the underlying infrastructure. By effectively utilizing components like the dynamic client, discovery client, and dynamic informer factory, and by carefully considering aspects like filtering, concurrency, and security, one can construct highly adaptive solutions that transcend the limitations of compile-time resource definitions.

Platforms that manage a rapidly growing and diverse set of APIs, such as ApiPark, an open-source AI gateway and API management platform, stand to gain immense value from such dynamic resource watching. The capability to automatically discover new API definitions, whether they're standard services or custom resources encapsulating AI model prompts, and integrate them into a unified API format based on OpenAPI specifications, fundamentally streamlines API lifecycle management. This dynamism is not just a technical feature; it's a strategic advantage, enabling faster innovation and reducing operational overhead in complex, API-driven environments. As cloud-native architectures continue to embrace custom resources and dynamic configuration, mastering the art of dynamic informers in Go will remain an invaluable skill for building the next generation of intelligent and adaptive systems.


Frequently Asked Questions (FAQ)

1. What is the primary difference between a static informer and a dynamic informer? A static informer is compiled with knowledge of specific Go types for Kubernetes resources (e.g., Pod, Deployment), generating type-safe client methods. It can only watch resources for which its types were generated. A dynamic informer, conversely, discovers resource types at runtime using the DiscoveryClient and operates on generic unstructured.Unstructured objects, allowing it to watch any resource (including unknown Custom Resources) without prior compile-time knowledge of its Go type.

2. Why would I need a dynamic informer if I can just generate Go types for all my CRDs? While generating Go types works for known CRDs, a dynamic informer becomes necessary when: 1) the CRDs you need to watch are not known at compile time (e.g., they are installed by third parties post-deployment); 2) you're building a generic tool or "meta-controller" that needs to operate on any resource adhering to a pattern, rather than specific types; or 3) you want your system to be resilient to new API versions or entirely new CRDs being introduced without requiring recompilation and redeployment.

3. What are the main client-go components used to build a dynamic informer? The core components are: * dynamic.Interface (dynamic client): For interacting with resources using unstructured.Unstructured objects. * discovery.DiscoveryInterface (discovery client): For querying the Kubernetes API server to find available GroupVersionResource (GVRs). * dynamicinformer.NewFilteredDynamicSharedInformerFactory: For creating informers that operate on unstructured data. * cache.ResourceEventHandler: For defining event handling logic (Add, Update, Delete) when resources change.

4. How does a dynamic informer handle resources that are removed from the cluster or new ones added? A well-designed dynamic informer system periodically re-runs its discovery loop using the discoveryClient. During this process, it identifies any new GroupVersionResource (GVRs) that match its filtering criteria and creates new informers for them. Conversely, if a GVR that was previously watched is no longer reported by the API server, the system should ideally stop and clean up the corresponding informer (though gracefully stopping individual informers from a SharedInformerFactory can be complex and might require manual management or restarting the factory).

5. What are some real-world applications of dynamic informers, especially concerning API management? Dynamic informers are crucial for: * Generic Kubernetes Operators: Building single operators that can manage a wide range of CRDs based on common patterns or labels. * Policy Engines: Enforcing security or operational policies across diverse, dynamically created resources. * Dynamic API Gateways: Automatically discovering new API services (e.g., microservices defined by CRDs, or OpenAPI specifications) and configuring an API gateway to expose them without manual intervention. For example, platforms like ApiPark could leverage dynamic informers to rapidly integrate and manage new AI models or REST services as they are deployed, ensuring their unified API format and gateway configuration remain up-to-date.

πŸš€You can securely and efficiently call the OpenAI API on APIPark in just two steps:

Step 1: Deploy the APIPark AI gateway in 5 minutes.

APIPark is developed based on Golang, offering strong product performance and low development and maintenance costs. You can deploy APIPark with a single command line.

curl -sSO https://download.apipark.com/install/quick-start.sh; bash quick-start.sh
APIPark Command Installation Process

In my experience, you can see the successful deployment interface within 5 to 10 minutes. Then, you can log in to APIPark using your account.

APIPark System Interface 01

Step 2: Call the OpenAI API.

APIPark System Interface 02