869 lines
25 KiB
Go
869 lines
25 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
"golang.org/x/crypto/ssh"
|
|
v1 "k8s.io/api/core/v1"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/client-go/kubernetes"
|
|
"k8s.io/client-go/rest"
|
|
metrics "k8s.io/metrics/pkg/client/clientset/versioned"
|
|
metricsv1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1"
|
|
"gopkg.in/yaml.v3"
|
|
)
|
|
|
|
// Node and NodePool structs
|
|
type Node struct {
|
|
Name string `yaml:"name" json:"name"`
|
|
IP string `yaml:"ip" json:"ip"`
|
|
Status string `yaml:"status" json:"status"`
|
|
CPU int `yaml:"cpu" json:"cpu"`
|
|
Memory int `yaml:"memory" json:"memory"`
|
|
Role string `yaml:"role" json:"role"`
|
|
Cluster string `yaml:"cluster" json:"cluster"`
|
|
LastActive string `yaml:"last_active" json:"last_active"`
|
|
Pods int `yaml:"pods" json:"pods"`
|
|
Temperature float64 `yaml:"temperature" json:"temperature"`
|
|
}
|
|
|
|
type NodePool struct {
|
|
Nodes []Node `yaml:"nodes" json:"nodes"`
|
|
}
|
|
|
|
type StatusResponse struct {
|
|
ClusterName string `json:"clusterName"`
|
|
ClusterCPU int `json:"clusterCpu"`
|
|
ClusterMemory int `json:"clusterMemory"`
|
|
Nodes []Node `json:"nodes"`
|
|
}
|
|
|
|
// --- ENV helpers ---
|
|
func mustIntEnv(name string, def int) int {
|
|
if val := os.Getenv(name); val != "" {
|
|
if i, err := strconv.Atoi(val); err == nil {
|
|
return i
|
|
}
|
|
}
|
|
return def
|
|
}
|
|
|
|
func mustEnv(key, def string) string {
|
|
if val := os.Getenv(key); val != "" {
|
|
return val
|
|
}
|
|
return def
|
|
}
|
|
|
|
// --- load nodepool and unmarchall it ---
|
|
func loadPool(file string, clusterName string) (*NodePool, error) {
|
|
lockPath := file + ".lock"
|
|
|
|
// --- Wait if someone is writing ---
|
|
for {
|
|
if _, err := os.Stat(lockPath); os.IsNotExist(err) {
|
|
break
|
|
}
|
|
time.Sleep(200 * time.Millisecond)
|
|
}
|
|
|
|
var pool NodePool
|
|
|
|
// --- Read the YAML file ---
|
|
data, err := os.ReadFile(file)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to read pool file: %w", err)
|
|
}
|
|
|
|
// --- Unmarshal YAML ---
|
|
if err := yaml.Unmarshal(data, &pool); err != nil {
|
|
return nil, fmt.Errorf("failed to unmarshal YAML: %w", err)
|
|
}
|
|
|
|
if len(pool.Nodes) == 0 {
|
|
return nil, fmt.Errorf("node pool is empty")
|
|
}
|
|
|
|
// --- Determine actual nodes dynamically ---
|
|
actual := map[string]bool{}
|
|
config, err := rest.InClusterConfig()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get in-cluster config: %w", err)
|
|
}
|
|
|
|
clientset, err := kubernetes.NewForConfig(config)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create Kubernetes client: %w", err)
|
|
}
|
|
|
|
nodeList, err := clientset.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to list nodes: %w", err)
|
|
}
|
|
|
|
for _, n := range nodeList.Items {
|
|
actual[n.Name] = true
|
|
}
|
|
|
|
// --- Reconcile nodes belonging to this cluster ---
|
|
now := time.Now().Format(time.RFC3339)
|
|
for i := range pool.Nodes {
|
|
n := &pool.Nodes[i]
|
|
|
|
if n.Cluster == clusterName && !actual[n.Name] {
|
|
n.Status = "offline"
|
|
n.Cluster = "none"
|
|
n.Role = "none"
|
|
n.CPU = 0
|
|
n.Memory = 0
|
|
n.Pods = 0
|
|
n.Temperature = -1
|
|
n.LastActive = now
|
|
}
|
|
}
|
|
|
|
return &pool, nil
|
|
}
|
|
// --- savenodepool and marchall it ---
|
|
func savePool(file string, pool *NodePool) error {
|
|
if pool == nil {
|
|
return fmt.Errorf("refusing to save: pool is nil")
|
|
}
|
|
if len(pool.Nodes) == 0 {
|
|
return fmt.Errorf("refusing to save: node pool is empty")
|
|
}
|
|
|
|
lockPath := file + ".lock"
|
|
|
|
// Acquire lock
|
|
var lockFile *os.File
|
|
var err error
|
|
for {
|
|
lockFile, err = os.OpenFile(lockPath, os.O_CREATE|os.O_EXCL|os.O_RDWR, 0644)
|
|
if err == nil {
|
|
break
|
|
}
|
|
if os.IsExist(err) {
|
|
time.Sleep(200 * time.Millisecond)
|
|
continue
|
|
}
|
|
return err
|
|
}
|
|
defer func() {
|
|
lockFile.Close()
|
|
os.Remove(lockPath)
|
|
}()
|
|
|
|
// --- Read current disk file directly (NO loadPool call) ---
|
|
var diskPool NodePool
|
|
|
|
data, err := os.ReadFile(file)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := yaml.Unmarshal(data, &diskPool); err != nil {
|
|
return err
|
|
}
|
|
|
|
// --- Merge ---
|
|
for _, updatedNode := range pool.Nodes {
|
|
for i, n := range diskPool.Nodes {
|
|
if n.Name == updatedNode.Name {
|
|
diskPool.Nodes[i] = updatedNode
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// Marshal merged pool
|
|
mergedData, err := yaml.Marshal(&diskPool)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Write to temp file
|
|
tmpFile := file + ".tmp"
|
|
f, err := os.OpenFile(tmpFile, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if _, err := f.Write(mergedData); err != nil {
|
|
f.Close()
|
|
return err
|
|
}
|
|
|
|
f.Sync()
|
|
f.Close()
|
|
|
|
return os.Rename(tmpFile, file)
|
|
}
|
|
|
|
// --- SSH ---
|
|
func runSSH(host, user, pass, cmd string) (string, error) {
|
|
config := &ssh.ClientConfig{
|
|
User: user,
|
|
Auth: []ssh.AuthMethod{ssh.Password(pass)},
|
|
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
|
|
Timeout: 10 * time.Second,
|
|
}
|
|
client, err := ssh.Dial("tcp", fmt.Sprintf("%s:22", host), config)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
defer client.Close()
|
|
session, err := client.NewSession()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
defer session.Close()
|
|
out, err := session.CombinedOutput(cmd)
|
|
return string(out), err
|
|
}
|
|
|
|
// --- Node helpers ---
|
|
func isControlPlane(n *v1.Node) bool {
|
|
if _, ok := n.Labels["node.kubernetes.io/microk8s-controlplane"]; ok {
|
|
return true
|
|
}
|
|
if _, ok := n.Labels["node-role.kubernetes.io/control-plane"]; ok {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func nodeIP(n *v1.Node) string {
|
|
for _, addr := range n.Status.Addresses {
|
|
if addr.Type == v1.NodeInternalIP {
|
|
return addr.Address
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// updateNodePool updates CPU, Memory, Pods, Temperature, Status, and Role
|
|
// 🟧 Only updates nodes that belong to THIS cluster (Golden Rule)
|
|
func updateNodePool(
|
|
clientset *kubernetes.Clientset,
|
|
metricsClient *metrics.Clientset,
|
|
poolFile, sshUser, sshPass, clusterName string,
|
|
) error {
|
|
pool, err := loadPool(poolFile, clusterName)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot load node pool: %w", err)
|
|
}
|
|
|
|
poolMap := map[string]*Node{}
|
|
for i := range pool.Nodes {
|
|
poolMap[pool.Nodes[i].Name] = &pool.Nodes[i]
|
|
}
|
|
|
|
clusterNodes, err := clientset.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{})
|
|
if err != nil {
|
|
return fmt.Errorf("cannot list cluster nodes: %w", err)
|
|
}
|
|
|
|
nodeMetricsList, err := metricsClient.MetricsV1beta1().NodeMetricses().List(context.Background(), metav1.ListOptions{})
|
|
if err != nil {
|
|
log.Println("cannot get node metrics:", err)
|
|
}
|
|
|
|
nodeMetricsMap := map[string]metricsv1beta1.NodeMetrics{}
|
|
for _, m := range nodeMetricsList.Items {
|
|
nodeMetricsMap[m.Name] = m
|
|
}
|
|
|
|
for _, n := range clusterNodes.Items {
|
|
poolNode := poolMap[n.Name]
|
|
if poolNode == nil {
|
|
continue
|
|
}
|
|
|
|
poolNode.Status = "online"
|
|
poolNode.LastActive = time.Now().Format(time.RFC3339)
|
|
poolNode.Cluster = clusterName
|
|
|
|
if isControlPlane(&n) {
|
|
poolNode.Role = "microk8s-controlplane"
|
|
} else {
|
|
poolNode.Role = "worker"
|
|
}
|
|
|
|
temp := getNodeTemp(poolNode, sshUser, sshPass)
|
|
if temp > 0 {
|
|
poolNode.Temperature = temp
|
|
}
|
|
|
|
if m, ok := nodeMetricsMap[n.Name]; ok {
|
|
poolNode.CPU = int((m.Usage.Cpu().MilliValue() * 100) / n.Status.Capacity.Cpu().MilliValue())
|
|
poolNode.Memory = int((m.Usage.Memory().Value() * 100) / n.Status.Capacity.Memory().Value())
|
|
}
|
|
|
|
pods, err := clientset.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{
|
|
FieldSelector: fmt.Sprintf("spec.nodeName=%s", n.Name),
|
|
})
|
|
if err == nil {
|
|
poolNode.Pods = len(pods.Items)
|
|
}
|
|
}
|
|
|
|
if err := savePool(poolFile, pool); err != nil {
|
|
return fmt.Errorf("cannot save node pool: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// --- Temperature ---
|
|
// getNodeTemp uses the IP from the node-pool.yaml
|
|
func getNodeTemp(node *Node, sshUser, sshPass string) float64 {
|
|
ip := node.IP
|
|
if ip == "" {
|
|
log.Printf("Warning: node %s has no IP in pool, skipping temperature", node.Name)
|
|
return 0
|
|
}
|
|
|
|
// call your existing SSH logic — unchanged
|
|
out, err := runSSH(ip, sshUser, sshPass, "cat /proc/device-tree/model")
|
|
if err != nil {
|
|
log.Printf("getNodeTemp: failed to read model from %s: %v", ip, sshUser, sshPass, err)
|
|
return 0
|
|
}
|
|
|
|
hw := strings.ToUpper(strings.TrimSpace(out))
|
|
var cmd string
|
|
switch {
|
|
case strings.Contains(hw, "RASPBERRY"):
|
|
cmd = "vcgencmd measure_temp | egrep -o '[0-9]+\\.[0-9]+'"
|
|
case strings.Contains(hw, "ODROID"):
|
|
cmd = "awk '{printf \"%3.1f\", $1/1000}' /sys/class/thermal/thermal_zone0/temp"
|
|
default:
|
|
return 0
|
|
}
|
|
|
|
tempStr, err := runSSH(ip, sshUser, sshPass, cmd)
|
|
if err != nil {
|
|
log.Printf("getNodeTemp: failed to read temperature from %s: %v", ip, err)
|
|
return 0
|
|
}
|
|
|
|
t, err := strconv.ParseFloat(strings.TrimSpace(tempStr), 64)
|
|
if err != nil {
|
|
return 0
|
|
}
|
|
|
|
return t
|
|
}
|
|
|
|
func findPoolNodeByName(pool *NodePool, name string) *Node {
|
|
for i := range pool.Nodes {
|
|
if pool.Nodes[i].Name == name {
|
|
return &pool.Nodes[i]
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// --- Control-plane management ---
|
|
func ensureControlPlanes(cs *kubernetes.Clientset, poolFileName, sshUser, sshPass, clusterName string, desired int) {
|
|
// Load the current node pool from disk
|
|
pool, err := loadPool(poolFileName, clusterName)
|
|
if err != nil {
|
|
log.Println("Cannot load pool in ensureControlPlanes:", err)
|
|
return
|
|
}
|
|
|
|
ctx := context.Background()
|
|
|
|
// List nodes currently in this Kubernetes cluster
|
|
nodes, err := cs.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
|
|
if err != nil {
|
|
log.Println("failed to list nodes:", err)
|
|
return
|
|
}
|
|
|
|
// Count current control-plane nodes
|
|
cpCount := 0
|
|
var seedIP string
|
|
for _, n := range nodes.Items {
|
|
if isControlPlane(&n) {
|
|
cpCount++
|
|
if seedIP == "" {
|
|
seedIP = nodeIP(&n)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Nothing to do if enough control-planes
|
|
if cpCount >= desired {
|
|
return
|
|
}
|
|
|
|
if seedIP == "" {
|
|
log.Println("no available control-plane node found")
|
|
return
|
|
}
|
|
|
|
// Find an offline node in the pool to activate
|
|
for i := range pool.Nodes {
|
|
n := &pool.Nodes[i]
|
|
if n.Status != "offline" {
|
|
continue
|
|
}
|
|
|
|
log.Printf("Attempting to activate node %s as control-plane", n.Name)
|
|
|
|
// Generate join command from seed control-plane
|
|
out, err := runSSH(seedIP, sshUser, sshPass, "microk8s add-node")
|
|
if err != nil {
|
|
log.Println("add-node failed:", err)
|
|
return
|
|
}
|
|
|
|
var joinCmd string
|
|
for _, line := range strings.Split(out, "\n") {
|
|
if strings.HasPrefix(strings.TrimSpace(line), "microk8s join") {
|
|
joinCmd = strings.TrimSpace(line)
|
|
break
|
|
}
|
|
}
|
|
|
|
if joinCmd == "" {
|
|
log.Println("no join command found")
|
|
return
|
|
}
|
|
|
|
// Run join on offline node
|
|
_, err = runSSH(n.IP, sshUser, sshPass, joinCmd)
|
|
if err != nil {
|
|
log.Println("join failed:", err)
|
|
return
|
|
}
|
|
|
|
// Update node in pool
|
|
n.Status = "online"
|
|
n.Cluster = clusterName
|
|
n.Role = "microk8s-controlplane"
|
|
n.LastActive = time.Now().Format(time.RFC3339)
|
|
log.Printf("Control-plane %s activated", n.Name)
|
|
|
|
// Save back updated pool to disk
|
|
if err := savePool(poolFileName, pool); err != nil {
|
|
log.Println("Failed to save pool after activating control-plane:", err)
|
|
}
|
|
|
|
return
|
|
}
|
|
}
|
|
// --- Worker management ---
|
|
|
|
func activateOneWorker(
|
|
cs *kubernetes.Clientset,
|
|
pool *NodePool,
|
|
poolFileName, sshUser, sshPass, clusterName string,
|
|
) {
|
|
ctx := context.Background()
|
|
var workerNode *Node
|
|
for i := range pool.Nodes {
|
|
n := &pool.Nodes[i]
|
|
if n.Status == "offline" {
|
|
workerNode = n
|
|
break
|
|
}
|
|
}
|
|
|
|
if workerNode == nil {
|
|
log.Println("No offline nodes available — skipping activation")
|
|
return
|
|
}
|
|
|
|
log.Printf("Attempting to activate %s node as worker node", workerNode.Name)
|
|
|
|
// Find a seed control-plane node
|
|
nodes, err := cs.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
|
|
if err != nil {
|
|
log.Println("Failed to list nodes:", err)
|
|
return
|
|
}
|
|
|
|
var seedIP string
|
|
for _, n := range nodes.Items {
|
|
if isControlPlane(&n) {
|
|
seedIP = nodeIP(&n)
|
|
break
|
|
}
|
|
}
|
|
|
|
if seedIP == "" {
|
|
log.Println("No control-plane node available to generate join command")
|
|
return
|
|
}
|
|
|
|
// Generate join command on seed control-plane
|
|
out, err := runSSH(seedIP, sshUser, sshPass, "microk8s add-node")
|
|
if err != nil {
|
|
log.Println("add-node failed:", err)
|
|
return
|
|
}
|
|
|
|
var joinCmd string
|
|
for _, line := range strings.Split(out, "\n") {
|
|
if strings.HasPrefix(strings.TrimSpace(line), "microk8s join") {
|
|
joinCmd = strings.TrimSpace(line) + " --worker"
|
|
break
|
|
}
|
|
}
|
|
|
|
if joinCmd == "" {
|
|
log.Println("No join command found from seed control-plane")
|
|
return
|
|
}
|
|
|
|
// Run join command on the offline worker node (use pool IP)
|
|
_, err = runSSH(workerNode.IP, sshUser, sshPass, joinCmd)
|
|
if err != nil {
|
|
log.Println("Worker join failed:", err)
|
|
return
|
|
}
|
|
|
|
// Wait for the node to appear in the cluster and become Ready
|
|
tick := time.Tick(5 * time.Second)
|
|
timeout := time.After(3 * time.Minute)
|
|
|
|
Loop:
|
|
for {
|
|
select {
|
|
case <-timeout:
|
|
log.Printf("Timeout waiting for node %s to join", workerNode.Name)
|
|
return
|
|
case <-tick:
|
|
nodes, err := cs.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
|
|
if err != nil {
|
|
log.Println("Error listing nodes:", err)
|
|
continue
|
|
}
|
|
for _, n := range nodes.Items {
|
|
if n.Name == workerNode.Name {
|
|
for _, cond := range n.Status.Conditions {
|
|
if cond.Type == v1.NodeReady && cond.Status == v1.ConditionTrue {
|
|
break Loop // Node is Ready → exit
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Mark node as online in pool
|
|
workerNode.Status = "online"
|
|
workerNode.Cluster = clusterName
|
|
workerNode.Role = "worker"
|
|
workerNode.LastActive = time.Now().Format(time.RFC3339)
|
|
log.Printf("Node %s successfully activated as worker node", workerNode.Name)
|
|
}
|
|
|
|
// Deactivate worker node
|
|
func deactivateOneWorkerSafe(cs *kubernetes.Clientset, pool *NodePool, poolFileName, sshUser, sshPass, clusterName string, waitSec int) {
|
|
ctx := context.Background()
|
|
|
|
// 1️⃣ List nodes in the cluster
|
|
nodes, err := cs.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
|
|
if err != nil {
|
|
log.Printf("Cannot list cluster nodes: %v", err)
|
|
return
|
|
}
|
|
|
|
// 2️⃣ Pick the first worker node that is not cordoned
|
|
var workerNode *v1.Node
|
|
for i := range nodes.Items {
|
|
n := &nodes.Items[i]
|
|
if !isControlPlane(n) && !n.Spec.Unschedulable {
|
|
workerNode = n
|
|
break
|
|
}
|
|
}
|
|
if workerNode == nil {
|
|
log.Println("No worker nodes available for deactivation")
|
|
return
|
|
}
|
|
|
|
log.Printf("Deactivating node: %s", workerNode.Name)
|
|
|
|
// 3️⃣ Find the corresponding node in the pool file
|
|
poolNode := findPoolNodeByName(pool, workerNode.Name)
|
|
if poolNode == nil {
|
|
log.Printf("No pool node entry found for %s", workerNode.Name)
|
|
return
|
|
}
|
|
|
|
// 4️⃣ Cordoning
|
|
patch := []byte(`{"spec":{"unschedulable":true}}`)
|
|
_, err = cs.CoreV1().Nodes().Patch(ctx, workerNode.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{})
|
|
if err != nil {
|
|
log.Printf("Failed to cordon node %s: %v", workerNode.Name, err)
|
|
return
|
|
}
|
|
log.Printf("Node %s cordoned successfully", workerNode.Name)
|
|
|
|
// 5️⃣ Drain pods
|
|
err = drainNode(cs, workerNode.Name)
|
|
if err != nil {
|
|
log.Printf("Drain failed for node %s: %v. Uncordoning node", workerNode.Name, err)
|
|
patch = []byte(`{"spec":{"unschedulable":false}}`)
|
|
_, _ = cs.CoreV1().Nodes().Patch(ctx, workerNode.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{})
|
|
return
|
|
}
|
|
log.Printf("Node %s drained successfully", workerNode.Name)
|
|
|
|
// 6️⃣ SSH to the node and make it leave the cluster
|
|
_, err = runSSH(poolNode.IP, sshUser, sshPass, "microk8s leave || true")
|
|
if err != nil {
|
|
log.Printf("microk8s leave failed on node %s (%s): %v", workerNode.Name, poolNode.IP, err)
|
|
return
|
|
}
|
|
log.Printf("Node %s leave executed successfully", workerNode.Name)
|
|
|
|
// 7️⃣ Wait for node to disappear from Kubernetes API
|
|
timeout := time.After(time.Duration(waitSec) * time.Second)
|
|
ticker := time.NewTicker(3 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
WaitLoop:
|
|
for {
|
|
select {
|
|
case <-timeout:
|
|
log.Printf("Timeout waiting for node %s to leave the cluster", workerNode.Name)
|
|
break WaitLoop
|
|
case <-ticker.C:
|
|
_, err := cs.CoreV1().Nodes().Get(ctx, workerNode.Name, metav1.GetOptions{})
|
|
if err != nil {
|
|
// any error assumed node gone
|
|
break WaitLoop
|
|
}
|
|
log.Printf("Waiting for node %s membership to converge...", workerNode.Name)
|
|
}
|
|
}
|
|
|
|
// 8️⃣ Delete the Kubernetes Node object
|
|
err = cs.CoreV1().Nodes().Delete(ctx, workerNode.Name, metav1.DeleteOptions{})
|
|
if err != nil {
|
|
log.Printf("Failed to delete node %s from Kubernetes API: %v", workerNode.Name, err)
|
|
} else {
|
|
log.Printf("Node %s deleted from Kubernetes API", workerNode.Name)
|
|
}
|
|
|
|
// 9️⃣ Optional: reset the node locally after leaving the cluster
|
|
_, err = runSSH(poolNode.IP, sshUser, sshPass, "microk8s reset || true")
|
|
if err != nil {
|
|
log.Printf("microk8s reset failed on node %s (%s): %v", workerNode.Name, poolNode.IP, err)
|
|
} else {
|
|
log.Printf("Node %s reset successfully", workerNode.Name)
|
|
}
|
|
|
|
// 🔟 Update the node-pool file
|
|
poolNode.Status = "offline"
|
|
poolNode.Cluster = "none"
|
|
poolNode.Role = "none"
|
|
poolNode.CPU = 0
|
|
poolNode.Memory = 0
|
|
poolNode.Pods = 0
|
|
poolNode.Temperature = -1
|
|
poolNode.LastActive = time.Now().Format(time.RFC3339)
|
|
|
|
if err := savePool(poolFileName, pool); err != nil {
|
|
log.Printf("Failed to save node pool after deactivating node %s: %v", workerNode.Name, err)
|
|
return
|
|
}
|
|
log.Printf("Node-pool file updated: node %s marked offline", workerNode.Name)
|
|
}
|
|
// --- Drain node ---
|
|
func drainNode(cs *kubernetes.Clientset, nodeName string) error {
|
|
ctx := context.Background()
|
|
pods, err := cs.CoreV1().Pods("").List(ctx, metav1.ListOptions{
|
|
FieldSelector: fmt.Sprintf("spec.nodeName=%s", nodeName),
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, pod := range pods.Items {
|
|
if _, ok := pod.ObjectMeta.Annotations["kubernetes.io/config.mirror"]; ok {
|
|
continue
|
|
}
|
|
if pod.Namespace == "kube-system" {
|
|
continue
|
|
}
|
|
grace := int64(60)
|
|
_ = cs.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{
|
|
GracePeriodSeconds: &grace,
|
|
})
|
|
}
|
|
|
|
for {
|
|
remaining, _ := cs.CoreV1().Pods("").List(ctx, metav1.ListOptions{
|
|
FieldSelector: fmt.Sprintf("spec.nodeName=%s", nodeName),
|
|
})
|
|
active := 0
|
|
for _, p := range remaining.Items {
|
|
if p.Namespace != "kube-system" && p.DeletionTimestamp == nil {
|
|
active++
|
|
}
|
|
}
|
|
if active == 0 {
|
|
break
|
|
}
|
|
time.Sleep(5 * time.Second)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// --- Cluster utilization ---
|
|
func clusterUtilization(cs *kubernetes.Clientset, ms *metrics.Clientset) (cpuPct int, memPct int, err error) {
|
|
ctx := context.Background()
|
|
nodes, err := cs.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
|
|
metricsList, err := ms.MetricsV1beta1().NodeMetricses().List(ctx, metav1.ListOptions{})
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
|
|
var totalCPUCap, totalMemCap, totalCPUUse, totalMemUse int64
|
|
for _, n := range nodes.Items {
|
|
totalCPUCap += n.Status.Capacity.Cpu().MilliValue()
|
|
totalMemCap += n.Status.Capacity.Memory().Value()
|
|
}
|
|
|
|
for _, m := range metricsList.Items {
|
|
totalCPUUse += m.Usage.Cpu().MilliValue()
|
|
totalMemUse += m.Usage.Memory().Value()
|
|
}
|
|
|
|
cpuPct = int((totalCPUUse * 100) / totalCPUCap)
|
|
memPct = int((totalMemUse * 100) / totalMemCap)
|
|
return cpuPct, memPct, nil
|
|
}
|
|
|
|
// --- Web GUI ---
|
|
func startWebGUI(poolFileName string, clientset *kubernetes.Clientset, metricsClient *metrics.Clientset, clusterName string) {
|
|
http.HandleFunc("/status", func(w http.ResponseWriter, r *http.Request) {
|
|
pool, err := loadPool(poolFileName, clusterName)
|
|
if err != nil {
|
|
log.Println("Cannot load node pool:", err)
|
|
http.Error(w, "cannot load node pool", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
cpuPct, memPct, err := clusterUtilization(clientset, metricsClient)
|
|
if err != nil {
|
|
log.Println("clusterUtilization error:", err)
|
|
}
|
|
|
|
resp := StatusResponse{
|
|
ClusterName: clusterName,
|
|
ClusterCPU: cpuPct,
|
|
ClusterMemory: memPct,
|
|
Nodes: pool.Nodes,
|
|
}
|
|
|
|
log.Printf(
|
|
"StatusResponse: cluster=%q cpu=%d mem=%d nodes=%d\n",
|
|
resp.ClusterName, resp.ClusterCPU, resp.ClusterMemory, len(pool.Nodes),
|
|
)
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(resp)
|
|
})
|
|
|
|
// Serve static files
|
|
http.Handle("/", http.FileServer(http.Dir("/app/web")))
|
|
|
|
go func() {
|
|
log.Println("Web GUI running at :8080")
|
|
if err := http.ListenAndServe(":8080", nil); err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
}()
|
|
}
|
|
// --- Main ---
|
|
func main() {
|
|
poolFileName := mustEnv("NODE_POOL_FILE_NAME", "node-pool.yaml")
|
|
sshUser := mustEnv("NODE_SSH_USER", "ubuntu")
|
|
sshPass := mustEnv("NODE_SSH_PASS", "")
|
|
// read thresholds from environment
|
|
minCPU := mustIntEnv("MIN_CPU", 40)
|
|
minMem := mustIntEnv("MIN_MEM", 50)
|
|
maxCPU := mustIntEnv("MAX_CPU", 90)
|
|
maxMem := mustIntEnv("MAX_MEM", 90)
|
|
clusterName := mustEnv("CLUSTER_NAME", "clustermain")
|
|
desiredCP := mustIntEnv("DESIRED_CONTROLPLANES", 1)
|
|
// Kubernetes client
|
|
config, err := rest.InClusterConfig()
|
|
if err != nil {
|
|
log.Fatal("Cannot create in-cluster config:", err)
|
|
}
|
|
|
|
clientset, err := kubernetes.NewForConfig(config)
|
|
if err != nil {
|
|
log.Fatal("Cannot create clientset:", err)
|
|
}
|
|
|
|
metricsClient, err := metrics.NewForConfig(config)
|
|
if err != nil {
|
|
log.Fatal("Cannot create metrics client:", err)
|
|
}
|
|
|
|
// Web GUI
|
|
startWebGUI(poolFileName, clientset, metricsClient, clusterName)
|
|
// Main control loop
|
|
for {
|
|
// Always load pool fresh from disk
|
|
pool, err := loadPool(poolFileName, clusterName)
|
|
if err != nil {
|
|
log.Println("Cannot load node pool:", err)
|
|
time.Sleep(10 * time.Second)
|
|
continue
|
|
}
|
|
|
|
// Update ONLY nodes that exist in THIS cluster
|
|
err = updateNodePool(clientset, metricsClient, poolFileName, sshUser, sshPass, clusterName)
|
|
if err != nil {
|
|
log.Printf("updateNodePool error: %v", err)
|
|
}
|
|
// Ensure control-plane count
|
|
ensureControlPlanes(clientset, poolFileName, sshUser, sshPass, clusterName, desiredCP)
|
|
|
|
// cluster utilization
|
|
cpuPct, memPct, err := clusterUtilization(clientset, metricsClient)
|
|
if err == nil {
|
|
log.Printf("Cluster utilization: CPU %d%%, MEM %d%%", cpuPct, memPct)
|
|
}
|
|
|
|
// decide worker activation based on thresholds
|
|
// if cpuPct > maxCPU || memPct > maxMem {
|
|
if cpuPct > maxCPU {
|
|
activateOneWorker(clientset, pool, poolFileName, sshUser, sshPass, clusterName)
|
|
}
|
|
|
|
// decide worker deactivation based on thresholds
|
|
// if cpuPct < minCPU && memPct < minMem {
|
|
if cpuPct < minCPU {
|
|
deactivateOneWorkerSafe(clientset, pool, poolFileName, sshUser, sshPass, clusterName, 60)
|
|
}
|
|
|
|
time.Sleep(10 * time.Second)
|
|
}
|
|
}
|