package main import ( "context" "encoding/json" "fmt" "io/ioutil" "log" "net/http" "os" "strconv" "strings" "time" "golang.org/x/crypto/ssh" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" metrics "k8s.io/metrics/pkg/client/clientset/versioned" metricsv1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1" "gopkg.in/yaml.v3" ) // Node and NodePool structs type Node struct { Name string `yaml:"name" json:"name"` IP string `yaml:"ip" json:"ip"` Status string `yaml:"status" json:"status"` CPU int `yaml:"cpu" json:"cpu"` Memory int `yaml:"memory" json:"memory"` Role string `yaml:"role" json:"role"` Cluster string `yaml:"cluster" json:"cluster"` LastActive string `yaml:"last_active" json:"last_active"` Pods int `yaml:"pods" json:"pods"` Temperature float64 `yaml:"temperature" json:"temperature"` } type NodePool struct { Nodes []Node `yaml:"nodes" json:"nodes"` } type StatusResponse struct { ClusterName string `json:"clusterName"` ClusterCPU int `json:"clusterCpu"` ClusterMemory int `json:"clusterMemory"` Nodes []Node `json:"nodes"` } // --- ENV helpers --- func mustIntEnv(name string, def int) int { if val := os.Getenv(name); val != "" { if i, err := strconv.Atoi(val); err == nil { return i } } return def } func mustEnv(key, def string) string { if val := os.Getenv(key); val != "" { return val } return def } func loadPool(file string) (*NodePool, error) { var pool NodePool for { data, err := ioutil.ReadFile(file) if err != nil { log.Println("Failed to read node pool file, retrying:", err) time.Sleep(5 * time.Second) continue } err = yaml.Unmarshal(data, &pool) if err != nil { log.Println("Failed to parse node pool YAML, retrying:", err) time.Sleep(5 * time.Second) continue } if len(pool.Nodes) == 0 { log.Println("Node pool is empty, retrying...") time.Sleep(5 * time.Second) continue } // Successfully read a non-empty pool return &pool, nil } } // --- savenodepool and marchall it --- func savePool(file string, pool *NodePool) error { if pool == nil { return fmt.Errorf("refusing to save: pool is nil") } if len(pool.Nodes) == 0 { return fmt.Errorf("refusing to save: node pool is empty") } data, err := yaml.Marshal(pool) if err != nil { return err } return ioutil.WriteFile(file, data, 0644) } // --- SSH --- func runSSH(host, user, pass, cmd string) (string, error) { config := &ssh.ClientConfig{ User: user, Auth: []ssh.AuthMethod{ssh.Password(pass)}, HostKeyCallback: ssh.InsecureIgnoreHostKey(), Timeout: 10 * time.Second, } client, err := ssh.Dial("tcp", fmt.Sprintf("%s:22", host), config) if err != nil { return "", err } defer client.Close() session, err := client.NewSession() if err != nil { return "", err } defer session.Close() out, err := session.CombinedOutput(cmd) return string(out), err } // --- Node helpers --- func isControlPlane(n *v1.Node) bool { if _, ok := n.Labels["node.kubernetes.io/microk8s-controlplane"]; ok { return true } if _, ok := n.Labels["node-role.kubernetes.io/control-plane"]; ok { return true } return false } func nodeIP(n *v1.Node) string { for _, addr := range n.Status.Addresses { if addr.Type == v1.NodeInternalIP { return addr.Address } } return "" } // updateNodePool updates CPU, Memory, Pods, Temperature, Status, and Role // 🟧 Only updates nodes that belong to THIS cluster (Golden Rule) func updateNodePool( clientset *kubernetes.Clientset, metricsClient *metrics.Clientset, poolFile, sshUser, sshPass, clusterName string, ) error { pool, err := loadPool(poolFile) if err != nil { return fmt.Errorf("cannot load node pool: %w", err) } poolMap := map[string]*Node{} for i := range pool.Nodes { poolMap[pool.Nodes[i].Name] = &pool.Nodes[i] } clusterNodes, err := clientset.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{}) if err != nil { return fmt.Errorf("cannot list cluster nodes: %w", err) } nodeMetricsList, err := metricsClient.MetricsV1beta1().NodeMetricses().List(context.Background(), metav1.ListOptions{}) if err != nil { log.Println("cannot get node metrics:", err) } nodeMetricsMap := map[string]metricsv1beta1.NodeMetrics{} for _, m := range nodeMetricsList.Items { nodeMetricsMap[m.Name] = m } for _, n := range clusterNodes.Items { poolNode := poolMap[n.Name] if poolNode == nil { continue } poolNode.Status = "online" poolNode.LastActive = time.Now().Format(time.RFC3339) poolNode.Cluster = clusterName if isControlPlane(&n) { poolNode.Role = "microk8s-controlplane" } else { poolNode.Role = "worker" } temp := getNodeTemp(poolNode, sshUser, sshPass) if temp > 0 { poolNode.Temperature = temp } if m, ok := nodeMetricsMap[n.Name]; ok { poolNode.CPU = int((m.Usage.Cpu().MilliValue() * 100) / n.Status.Capacity.Cpu().MilliValue()) poolNode.Memory = int((m.Usage.Memory().Value() * 100) / n.Status.Capacity.Memory().Value()) } pods, err := clientset.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{ FieldSelector: fmt.Sprintf("spec.nodeName=%s", n.Name), }) if err == nil { poolNode.Pods = len(pods.Items) } } if err := savePool(poolFile, pool); err != nil { return fmt.Errorf("cannot save node pool: %w", err) } return nil } // --- Temperature --- // getNodeTemp uses the IP from the node-pool.yaml func getNodeTemp(node *Node, sshUser, sshPass string) float64 { ip := node.IP if ip == "" { log.Printf("Warning: node %s has no IP in pool, skipping temperature", node.Name) return 0 } // call your existing SSH logic — unchanged out, err := runSSH(ip, sshUser, sshPass, "cat /proc/device-tree/model") if err != nil { log.Printf("getNodeTemp: failed to read model from %s: %v", ip, sshUser, sshPass, err) return 0 } hw := strings.ToUpper(strings.TrimSpace(out)) var cmd string switch { case strings.Contains(hw, "RASPBERRY"): cmd = "vcgencmd measure_temp | egrep -o '[0-9]+\\.[0-9]+'" case strings.Contains(hw, "ODROID"): cmd = "awk '{printf \"%3.1f\", $1/1000}' /sys/class/thermal/thermal_zone0/temp" default: return 0 } tempStr, err := runSSH(ip, sshUser, sshPass, cmd) if err != nil { log.Printf("getNodeTemp: failed to read temperature from %s: %v", ip, err) return 0 } t, err := strconv.ParseFloat(strings.TrimSpace(tempStr), 64) if err != nil { return 0 } return t } func findPoolNodeByName(pool *NodePool, name string) *Node { for i := range pool.Nodes { if pool.Nodes[i].Name == name { return &pool.Nodes[i] } } return nil } // --- Control-plane management --- func ensureControlPlanes(cs *kubernetes.Clientset, poolFileName, sshUser, sshPass, clusterName string, desired int) { // Load the current node pool from disk pool, err := loadPool(poolFileName) if err != nil { log.Println("Cannot load pool in ensureControlPlanes:", err) return } ctx := context.Background() // List nodes currently in this Kubernetes cluster nodes, err := cs.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) if err != nil { log.Println("failed to list nodes:", err) return } // Count current control-plane nodes cpCount := 0 var seedIP string for _, n := range nodes.Items { if isControlPlane(&n) { cpCount++ if seedIP == "" { seedIP = nodeIP(&n) } } } // Nothing to do if enough control-planes if cpCount >= desired { return } if seedIP == "" { log.Println("no available control-plane node found") return } // Find an offline node in the pool to activate for i := range pool.Nodes { n := &pool.Nodes[i] if n.Status != "offline" { continue } log.Printf("Attempting to activate node %s as control-plane", n.Name) // Generate join command from seed control-plane out, err := runSSH(seedIP, sshUser, sshPass, "microk8s add-node") if err != nil { log.Println("add-node failed:", err) return } var joinCmd string for _, line := range strings.Split(out, "\n") { if strings.HasPrefix(strings.TrimSpace(line), "microk8s join") { joinCmd = strings.TrimSpace(line) break } } if joinCmd == "" { log.Println("no join command found") return } // Run join on offline node _, err = runSSH(n.IP, sshUser, sshPass, joinCmd) if err != nil { log.Println("join failed:", err) return } // Update node in pool n.Status = "online" n.Cluster = clusterName n.Role = "microk8s-controlplane" n.LastActive = time.Now().Format(time.RFC3339) log.Printf("Control-plane %s activated", n.Name) // Save back updated pool to disk if err := savePool(poolFileName, pool); err != nil { log.Println("Failed to save pool after activating control-plane:", err) } return } } // --- Worker management --- func activateOneWorker( cs *kubernetes.Clientset, pool *NodePool, poolFileName, sshUser, sshPass, clusterName string, ) { ctx := context.Background() var workerNode *Node for i := range pool.Nodes { n := &pool.Nodes[i] if n.Status == "offline" { workerNode = n break } } if workerNode == nil { log.Println("No offline nodes available — skipping activation") return } log.Printf("Attempting to activate %s node as worker node", workerNode.Name) // Find a seed control-plane node nodes, err := cs.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) if err != nil { log.Println("Failed to list nodes:", err) return } var seedIP string for _, n := range nodes.Items { if isControlPlane(&n) { seedIP = nodeIP(&n) break } } if seedIP == "" { log.Println("No control-plane node available to generate join command") return } // Generate join command on seed control-plane out, err := runSSH(seedIP, sshUser, sshPass, "microk8s add-node") if err != nil { log.Println("add-node failed:", err) return } var joinCmd string for _, line := range strings.Split(out, "\n") { if strings.HasPrefix(strings.TrimSpace(line), "microk8s join") { joinCmd = strings.TrimSpace(line) + " --worker" break } } if joinCmd == "" { log.Println("No join command found from seed control-plane") return } // Run join command on the offline worker node (use pool IP) _, err = runSSH(workerNode.IP, sshUser, sshPass, joinCmd) if err != nil { log.Println("Worker join failed:", err) return } // Wait for the node to appear in the cluster and become Ready tick := time.Tick(5 * time.Second) timeout := time.After(3 * time.Minute) Loop: for { select { case <-timeout: log.Printf("Timeout waiting for node %s to join", workerNode.Name) return case <-tick: nodes, err := cs.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) if err != nil { log.Println("Error listing nodes:", err) continue } for _, n := range nodes.Items { if n.Name == workerNode.Name { for _, cond := range n.Status.Conditions { if cond.Type == v1.NodeReady && cond.Status == v1.ConditionTrue { break Loop // Node is Ready → exit } } } } } } // Mark node as online in pool workerNode.Status = "online" workerNode.Cluster = clusterName workerNode.Role = "worker" workerNode.LastActive = time.Now().Format(time.RFC3339) log.Printf("Node %s successfully activated as worker node", workerNode.Name) } // Deactivate worker node func deactivateOneWorkerSafe(cs *kubernetes.Clientset, pool *NodePool, poolFileName, sshUser, sshPass, clusterName string, waitSec int) { ctx := context.Background() // 1️⃣ List nodes in the cluster nodes, err := cs.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) if err != nil { log.Printf("Cannot list cluster nodes: %v", err) return } // 2️⃣ Pick the first worker node that is not cordoned var workerNode *v1.Node for i := range nodes.Items { n := &nodes.Items[i] if !isControlPlane(n) && !n.Spec.Unschedulable { workerNode = n break } } if workerNode == nil { log.Println("No worker nodes available for deactivation") return } log.Printf("Deactivating node: %s", workerNode.Name) // 3️⃣ Find the corresponding node in the pool file poolNode := findPoolNodeByName(pool, workerNode.Name) if poolNode == nil { log.Printf("No pool node entry found for %s", workerNode.Name) return } // 4️⃣ Cordoning patch := []byte(`{"spec":{"unschedulable":true}}`) _, err = cs.CoreV1().Nodes().Patch(ctx, workerNode.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{}) if err != nil { log.Printf("Failed to cordon node %s: %v", workerNode.Name, err) return } log.Printf("Node %s cordoned successfully", workerNode.Name) // 5️⃣ Drain pods err = drainNode(cs, workerNode.Name) if err != nil { log.Printf("Drain failed for node %s: %v. Uncordoning node", workerNode.Name, err) patch = []byte(`{"spec":{"unschedulable":false}}`) _, _ = cs.CoreV1().Nodes().Patch(ctx, workerNode.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{}) return } log.Printf("Node %s drained successfully", workerNode.Name) // 6️⃣ SSH to the node and make it leave the cluster _, err = runSSH(poolNode.IP, sshUser, sshPass, "microk8s leave || true") if err != nil { log.Printf("microk8s leave failed on node %s (%s): %v", workerNode.Name, poolNode.IP, err) return } log.Printf("Node %s leave executed successfully", workerNode.Name) // 7️⃣ Wait for node to disappear from Kubernetes API timeout := time.After(time.Duration(waitSec) * time.Second) ticker := time.NewTicker(3 * time.Second) defer ticker.Stop() WaitLoop: for { select { case <-timeout: log.Printf("Timeout waiting for node %s to leave the cluster", workerNode.Name) break WaitLoop case <-ticker.C: _, err := cs.CoreV1().Nodes().Get(ctx, workerNode.Name, metav1.GetOptions{}) if err != nil { // any error assumed node gone break WaitLoop } log.Printf("Waiting for node %s membership to converge...", workerNode.Name) } } // 8️⃣ Delete the Kubernetes Node object err = cs.CoreV1().Nodes().Delete(ctx, workerNode.Name, metav1.DeleteOptions{}) if err != nil { log.Printf("Failed to delete node %s from Kubernetes API: %v", workerNode.Name, err) } else { log.Printf("Node %s deleted from Kubernetes API", workerNode.Name) } // 9️⃣ Optional: reset the node locally after leaving the cluster _, err = runSSH(poolNode.IP, sshUser, sshPass, "microk8s reset || true") if err != nil { log.Printf("microk8s reset failed on node %s (%s): %v", workerNode.Name, poolNode.IP, err) } else { log.Printf("Node %s reset successfully", workerNode.Name) } // 🔟 Update the node-pool file poolNode.Status = "offline" poolNode.Cluster = "none" poolNode.Role = "none" poolNode.CPU = 0 poolNode.Memory = 0 poolNode.Pods = 0 poolNode.Temperature = -1 poolNode.LastActive = time.Now().Format(time.RFC3339) if err := savePool(poolFileName, pool); err != nil { log.Printf("Failed to save node pool after deactivating node %s: %v", workerNode.Name, err) return } log.Printf("Node-pool file updated: node %s marked offline", workerNode.Name) } // --- Drain node --- func drainNode(cs *kubernetes.Clientset, nodeName string) error { ctx := context.Background() pods, err := cs.CoreV1().Pods("").List(ctx, metav1.ListOptions{ FieldSelector: fmt.Sprintf("spec.nodeName=%s", nodeName), }) if err != nil { return err } for _, pod := range pods.Items { if _, ok := pod.ObjectMeta.Annotations["kubernetes.io/config.mirror"]; ok { continue } if pod.Namespace == "kube-system" { continue } grace := int64(60) _ = cs.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{ GracePeriodSeconds: &grace, }) } for { remaining, _ := cs.CoreV1().Pods("").List(ctx, metav1.ListOptions{ FieldSelector: fmt.Sprintf("spec.nodeName=%s", nodeName), }) active := 0 for _, p := range remaining.Items { if p.Namespace != "kube-system" && p.DeletionTimestamp == nil { active++ } } if active == 0 { break } time.Sleep(5 * time.Second) } return nil } // --- Cluster utilization --- func clusterUtilization(cs *kubernetes.Clientset, ms *metrics.Clientset) (cpuPct int, memPct int, err error) { ctx := context.Background() nodes, err := cs.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) if err != nil { return 0, 0, err } metricsList, err := ms.MetricsV1beta1().NodeMetricses().List(ctx, metav1.ListOptions{}) if err != nil { return 0, 0, err } var totalCPUCap, totalMemCap, totalCPUUse, totalMemUse int64 for _, n := range nodes.Items { totalCPUCap += n.Status.Capacity.Cpu().MilliValue() totalMemCap += n.Status.Capacity.Memory().Value() } for _, m := range metricsList.Items { totalCPUUse += m.Usage.Cpu().MilliValue() totalMemUse += m.Usage.Memory().Value() } cpuPct = int((totalCPUUse * 100) / totalCPUCap) memPct = int((totalMemUse * 100) / totalMemCap) return cpuPct, memPct, nil } // --- Web GUI --- func startWebGUI(poolFileName string, clientset *kubernetes.Clientset, metricsClient *metrics.Clientset, clusterName string) { http.HandleFunc("/status", func(w http.ResponseWriter, r *http.Request) { pool, err := loadPool(poolFileName) if err != nil { log.Println("Cannot load node pool:", err) http.Error(w, "cannot load node pool", http.StatusInternalServerError) return } cpuPct, memPct, err := clusterUtilization(clientset, metricsClient) if err != nil { log.Println("clusterUtilization error:", err) } resp := StatusResponse{ ClusterName: clusterName, ClusterCPU: cpuPct, ClusterMemory: memPct, Nodes: pool.Nodes, } log.Printf( "StatusResponse: cluster=%q cpu=%d mem=%d nodes=%d\n", resp.ClusterName, resp.ClusterCPU, resp.ClusterMemory, len(pool.Nodes), ) w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(resp) }) // Serve static files http.Handle("/", http.FileServer(http.Dir("/app/web"))) go func() { log.Println("Web GUI running at :8080") if err := http.ListenAndServe(":8080", nil); err != nil { log.Fatal(err) } }() } // --- Main --- func main() { poolFileName := mustEnv("NODE_POOL_FILE_NAME", "node-pool.yaml") sshUser := mustEnv("NODE_SSH_USER", "ubuntu") sshPass := mustEnv("NODE_SSH_PASS", "") // read thresholds from environment minCPU := mustIntEnv("MIN_CPU", 40) minMem := mustIntEnv("MIN_MEM", 50) maxCPU := mustIntEnv("MAX_CPU", 90) maxMem := mustIntEnv("MAX_MEM", 90) clusterName := mustEnv("CLUSTER_NAME", "clustermain") desiredCP := mustIntEnv("DESIRED_CONTROLPLANES", 1) // Kubernetes client config, err := rest.InClusterConfig() if err != nil { log.Fatal("Cannot create in-cluster config:", err) } clientset, err := kubernetes.NewForConfig(config) if err != nil { log.Fatal("Cannot create clientset:", err) } metricsClient, err := metrics.NewForConfig(config) if err != nil { log.Fatal("Cannot create metrics client:", err) } // Web GUI startWebGUI(poolFileName, clientset, metricsClient, clusterName) // Main control loop for { // Always load pool fresh from disk pool, err := loadPool(poolFileName) if err != nil { log.Println("Cannot load node pool:", err) time.Sleep(10 * time.Second) continue } // Update ONLY nodes that exist in THIS cluster err = updateNodePool(clientset, metricsClient, poolFileName, sshUser, sshPass, clusterName) if err != nil { log.Printf("updateNodePool error: %v", err) } // Ensure control-plane count ensureControlPlanes(clientset, poolFileName, sshUser, sshPass, clusterName, desiredCP) // cluster utilization cpuPct, memPct, err := clusterUtilization(clientset, metricsClient) if err == nil { log.Printf("Cluster utilization: CPU %d%%, MEM %d%%", cpuPct, memPct) } // decide worker activation based on thresholds if cpuPct > maxCPU || memPct > maxMem { activateOneWorker(clientset, pool, poolFileName, sshUser, sshPass, clusterName) } // decide worker deactivation based on thresholds if cpuPct < minCPU && memPct < minMem { deactivateOneWorkerSafe(clientset, pool, poolFileName, sshUser, sshPass, clusterName, 60) } time.Sleep(10 * time.Second) } }