package main import ( "context" "encoding/json" "fmt" "io/ioutil" "log" "net/http" "os" "strings" "time" "strconv" "gopkg.in/yaml.v3" "golang.org/x/crypto/ssh" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" metrics "k8s.io/metrics/pkg/client/clientset/versioned" ) // Node and NodePool structs unchanged type Node struct { Name string `yaml:"name" json:"name"` IP string `yaml:"ip" json:"ip"` Status string `yaml:"status" json:"status"` CPU int `yaml:"cpu" json:"cpu"` Memory int `yaml:"memory" json:"memory"` Role string `yaml:"role" json:"role"` Cluster string `yaml:"cluster" json:"cluster"` LastActive string `yaml:"last_active" json:"last_active"` } type NodePool struct { Nodes []Node `yaml:"nodes" json:"nodes"` } // mustIntEnv unchanged func mustIntEnv(name string, def int) int { if val := os.Getenv(name); val != "" { if i, err := strconv.Atoi(val); err == nil { return i } } return def } // loadPool unchanged func loadPool(file string) (*NodePool, error) { data, err := ioutil.ReadFile(file) if err != nil { return nil, err } var pool NodePool err = yaml.Unmarshal(data, &pool) if err != nil { return nil, err } return &pool, nil } // savePool func savePool(file string, pool *NodePool) error { data, err := yaml.Marshal(pool) if err != nil { return err } return ioutil.WriteFile(file, data, 0644) } // initNodePool syncs the node pool file with the real nodes in the cluster func initNodePool(clientset *kubernetes.Clientset, poolFile string) (*NodePool, error) { ctx := context.Background() // Load existing pool, if it exists pool, err := loadPool(poolFile) if err != nil { log.Println("Failed to load existing node pool, starting fresh:", err) pool = &NodePool{} } // Get current nodes from Kubernetes nodes, err := clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) if err != nil { return nil, fmt.Errorf("failed to list nodes: %w", err) } // Build a map for faster lookup poolMap := map[string]*Node{} for i := range pool.Nodes { poolMap[pool.Nodes[i].Name] = &pool.Nodes[i] } // Update pool with nodes that exist in the cluster updatedNodes := []Node{} for _, n := range nodes.Items { ip := nodeIP(&n) status := "online" role := "worker" if isControlPlane(&n) { role = "microk8s-controlplane" } node := Node{ Name: n.Name, IP: ip, Status: status, Role: role, Cluster: os.Getenv("CLUSTER_NAME"), CPU: 0, // will be updated by metrics Memory: 0, // will be updated by metrics LastActive: time.Now().Format(time.RFC3339), } // If this node existed in the old pool, preserve offline status or timestamps if oldNode, ok := poolMap[n.Name]; ok { node.Status = oldNode.Status node.LastActive = oldNode.LastActive node.CPU = oldNode.CPU node.Memory = oldNode.Memory node.Cluster = oldNode.Cluster node.Role = oldNode.Role } updatedNodes = append(updatedNodes, node) } pool.Nodes = updatedNodes // Save back to file savePool(poolFile, pool) log.Printf("Initialized node pool with %d nodes", len(pool.Nodes)) return pool, nil } // runSSH unchanged func runSSH(host, user, pass, cmd string) (string, error) { config := &ssh.ClientConfig{ User: user, Auth: []ssh.AuthMethod{ssh.Password(pass)}, HostKeyCallback: ssh.InsecureIgnoreHostKey(), Timeout: 10 * time.Second, } client, err := ssh.Dial("tcp", fmt.Sprintf("%s:22", host), config) if err != nil { return "", err } defer client.Close() session, err := client.NewSession() if err != nil { return "", err } defer session.Close() out, err := session.CombinedOutput(cmd) return string(out), err } // isControlPlane unchanged func isControlPlane(n *v1.Node) bool { if _, ok := n.Labels["node.kubernetes.io/microk8s-controlplane"]; ok { return true } if _, ok := n.Labels["node-role.kubernetes.io/control-plane"]; ok { return true } return false } // nodeIP unchanged func nodeIP(n *v1.Node) string { for _, addr := range n.Status.Addresses { if addr.Type == v1.NodeInternalIP { return addr.Address } } return "" } // Calculate per node utilization func updatePerNodeUtilization( cs *kubernetes.Clientset, ms *metrics.Clientset, pool *NodePool, ) error { ctx := context.Background() nodes, err := cs.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) if err != nil { return err } metricsList, err := ms.MetricsV1beta1().NodeMetricses().List(ctx, metav1.ListOptions{}) if err != nil { return err } // Build lookup tables capCPU := map[string]int64{} capMem := map[string]int64{} for _, n := range nodes.Items { capCPU[n.Name] = n.Status.Capacity.Cpu().MilliValue() capMem[n.Name] = n.Status.Capacity.Memory().Value() } usageCPU := map[string]int64{} usageMem := map[string]int64{} for _, m := range metricsList.Items { usageCPU[m.Name] = m.Usage.Cpu().MilliValue() usageMem[m.Name] = m.Usage.Memory().Value() } // Update pool entries for i := range pool.Nodes { name := pool.Nodes[i].Name cpuCap, ok1 := capCPU[name] memCap, ok2 := capMem[name] cpuUse, ok3 := usageCPU[name] memUse, ok4 := usageMem[name] if ok1 && ok2 && ok3 && ok4 && cpuCap > 0 && memCap > 0 { pool.Nodes[i].CPU = int((cpuUse * 100) / cpuCap) pool.Nodes[i].Memory = int((memUse * 100) / memCap) } } return nil } // clusterUtilization func clusterUtilization( cs *kubernetes.Clientset, ms *metrics.Clientset, ) (cpuPct int, memPct int, err error) { ctx := context.Background() nodes, err := cs.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) if err != nil { return 0, 0, err } metricsList, err := ms.MetricsV1beta1().NodeMetricses().List(ctx, metav1.ListOptions{}) if err != nil { return 0, 0, err } var ( totalCPUCap int64 totalMemCap int64 totalCPUUse int64 totalMemUse int64 ) for _, n := range nodes.Items { totalCPUCap += n.Status.Capacity.Cpu().MilliValue() totalMemCap += n.Status.Capacity.Memory().Value() } for _, m := range metricsList.Items { totalCPUUse += m.Usage.Cpu().MilliValue() totalMemUse += m.Usage.Memory().Value() } cpuPct = int((totalCPUUse * 100) / totalCPUCap) memPct = int((totalMemUse * 100) / totalMemCap) return cpuPct, memPct, nil } // ensureControlPlanes unchanged func ensureControlPlanes( cs *kubernetes.Clientset, pool *NodePool, poolFile, sshUser, sshPass, clusterName string, desired int, ) { ctx := context.Background() nodes, err := cs.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) if err != nil { log.Println("failed to list nodes:", err) return } cpCount := 0 var seedIP string for _, n := range nodes.Items { cpCount++ if seedIP == "" { seedIP = nodeIP(&n) } } if cpCount >= desired { return } if seedIP == "" { log.Println("no available node found in the node-pool") return } for i := range pool.Nodes { n := &pool.Nodes[i] if n.Status != "offline" { continue } log.Printf("Attempting to activate node %s as control-plane", n.Name) out, err := runSSH(seedIP, sshUser, sshPass, "microk8s add-node") if err != nil { log.Println("add-node failed:", err) return } var joinCmd string for _, line := range strings.Split(out, "\n") { if strings.HasPrefix(strings.TrimSpace(line), "microk8s join") { joinCmd = strings.TrimSpace(line) break } } if joinCmd == "" { log.Println("no join command found") return } _, err = runSSH(n.IP, sshUser, sshPass, joinCmd) if err != nil { log.Println("join failed:", err) return } n.Status = "online" n.Cluster = clusterName n.Role = "microk8s-controlplane" n.LastActive = time.Now().Format(time.RFC3339) log.Printf("Control-plane %s activated (%d/%d)", n.Name, cpCount+1, desired) return } } // activateOneWorker unchanged func activateOneWorker( cs *kubernetes.Clientset, pool *NodePool, poolFile, sshUser, sshPass, clusterName string, ) { ctx := context.Background() var workerNode *Node for i := range pool.Nodes { n := &pool.Nodes[i] if n.Status == "offline" { workerNode = &pool.Nodes[i] break } } if workerNode == nil { log.Println("No offline nodes available — skipping activation") return } log.Printf("Attempting to activate %s node as worker node", workerNode.Name) nodes, err := cs.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) if err != nil { log.Println("Failed to list nodes:", err) return } var seedIP string for _, n := range nodes.Items { if isControlPlane(&n) { seedIP = nodeIP(&n) break } } if seedIP == "" { log.Println("No control-plane node available to generate join command") return } out, err := runSSH(seedIP, sshUser, sshPass, "microk8s add-node") if err != nil { log.Println("add-node failed:", err) return } var joinCmd string for _, line := range strings.Split(out, "\n") { if strings.HasPrefix(strings.TrimSpace(line), "microk8s join") { joinCmd = strings.TrimSpace(line) break } } if joinCmd == "" { log.Println("No join command found from seed control-plane") return } joinCmd += " --worker" _, err = runSSH(workerNode.IP, sshUser, sshPass, joinCmd) if err != nil { log.Println("Worker join failed:", err) return } workerNode.Status = "online" workerNode.Cluster = clusterName workerNode.Role = "worker" workerNode.LastActive = time.Now().Format(time.RFC3339) log.Printf("Nnode %s successfully activated as workernode", workerNode.Name) } // deactivateOneWorker func deactivateOneWorker(cs *kubernetes.Clientset, pool *NodePool, poolFile, sshUser, sshPass, clusterName string, waitSec int) { ctx := context.Background() nodes, err := cs.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) if err != nil { log.Println("failed to list nodes:", err) return } var workerNode *v1.Node for _, n := range nodes.Items { if !isControlPlane(&n) && !n.Spec.Unschedulable { workerNode = &n break } } if workerNode == nil { log.Println("No worker nodes available for deactivation") return } log.Printf("Deactivating node: %s", workerNode.Name) ip := nodeIP(workerNode) workerNode.Spec.Unschedulable = true _, _ = cs.CoreV1().Nodes().Update(ctx, workerNode, metav1.UpdateOptions{}) _, err = runSSH(ip, sshUser, sshPass, "microk8s leave") if err != nil { log.Println("microk8s leave failed:", err) // Uncordon so node can be retried later workerNode.Spec.Unschedulable = false _, _ = cs.CoreV1().Nodes().Update(ctx, workerNode, metav1.UpdateOptions{}) log.Println("Node uncordoned to allow retry later") return } _ = cs.CoreV1().Nodes().Delete(ctx, workerNode.Name, metav1.DeleteOptions{}) for i := range pool.Nodes { if pool.Nodes[i].Name == workerNode.Name { pool.Nodes[i].Status = "offline" pool.Nodes[i].Cluster = "none" pool.Nodes[i].Role = "none" pool.Nodes[i].CPU = 0 pool.Nodes[i].Memory = 0 pool.Nodes[i].LastActive = time.Now().Format(time.RFC3339) } } time.Sleep(time.Duration(waitSec) * time.Second) } // startWebGUI starts a simple HTTP server showing the node pool func startWebGUI(poolFile string) { // API endpoint FIRST http.HandleFunc("/status", func(w http.ResponseWriter, r *http.Request) { log.Println("Serving /status") pool, err := loadPool(poolFile) if err != nil { log.Println("Cannot load node pool:", err) http.Error(w, "cannot load node pool", http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(pool) }) // Static files LAST (catch-all) fs := http.FileServer(http.Dir("/app/web")) http.Handle("/", fs) go func() { log.Println("Web GUI running at :8080") if err := http.ListenAndServe(":8080", nil); err != nil { log.Fatal(err) } }() } // --- MAIN --- func main() { poolFile := os.Getenv("NODE_LIST_FILE") sshUser := os.Getenv("NODE_SSH_USER") sshPass := os.Getenv("NODE_SSH_PASS") minCPU := mustIntEnv("MIN_CPU", 20) maxCPU := mustIntEnv("MAX_CPU", 80) minMem := mustIntEnv("MIN_MEM", 30) maxMem := mustIntEnv("MAX_MEM", 80) desiredCP := mustIntEnv("DESIRED_CONTROL_PLANES", 3) waitSec := mustIntEnv("DEACTIVATE_WAIT_SEC", 120) clusterName := os.Getenv("CLUSTER_NAME") if clusterName == "" { log.Fatal("CLUSTER_NAME must be set") } // 🔎 STARTUP CONFIG DUMP log.Println("=== startup configuration ===") log.Printf("NODE_LIST_FILE=%q", poolFile) log.Printf("NODE_SSH_USER=%q", sshUser) log.Printf("MIN_CPU=%d", minCPU) log.Printf("MAX_CPU=%d", maxCPU) log.Printf("MIN_MEM=%d", minMem) log.Printf("MAX_MEM=%d", maxMem) log.Printf("DESIRED_CONTROL_PLANES=%d", desiredCP) log.Printf("DEACTIVATE_WAIT_SEC=%d", waitSec) log.Printf("CLUSTER_NAMED=%q", clusterName) log.Println("=============================") // START WEB GUI startWebGUI(poolFile) config, err := rest.InClusterConfig() if err != nil { log.Fatal(err) } clientset, err := kubernetes.NewForConfig(config) if err != nil { log.Fatal(err) } // Initialize node pool before entering loop _, err := initNodePool(clientset, poolFile) if err != nil { log.Fatal("Failed to initialize node pool:", err) } for { pool, err := loadPool(poolFile) if err != nil { log.Println("cannot load node pool:", err) time.Sleep(30 * time.Second) continue } metricsClient, err := metrics.NewForConfig(config) if err != nil { log.Fatal(err) } ensureControlPlanes(clientset, pool, poolFile, sshUser, sshPass, clusterName, desiredCP) clusterCPU, clusterMem, err := clusterUtilization(clientset, metricsClient) if err != nil { log.Println("failed to compute utilization:", err) time.Sleep(30 * time.Second) continue } log.Printf("Cluster CPU: %d%%, Mem: