573 lines
17 KiB
Plaintext
573 lines
17 KiB
Plaintext
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"strings"
|
|
"time"
|
|
"strconv"
|
|
"gopkg.in/yaml.v3"
|
|
"golang.org/x/crypto/ssh"
|
|
v1 "k8s.io/api/core/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/client-go/kubernetes"
|
|
"k8s.io/client-go/rest"
|
|
metrics "k8s.io/metrics/pkg/client/clientset/versioned"
|
|
)
|
|
|
|
// Node and NodePool structs unchanged
|
|
|
|
type Node struct {
|
|
Name string `yaml:"name" json:"name"`
|
|
IP string `yaml:"ip" json:"ip"`
|
|
Status string `yaml:"status" json:"status"`
|
|
CPU int `yaml:"cpu" json:"cpu"`
|
|
Memory int `yaml:"memory" json:"memory"`
|
|
Role string `yaml:"role" json:"role"`
|
|
Cluster string `yaml:"cluster" json:"cluster"`
|
|
LastActive string `yaml:"last_active" json:"last_active"`
|
|
}
|
|
|
|
type NodePool struct {
|
|
Nodes []Node `yaml:"nodes" json:"nodes"`
|
|
}
|
|
|
|
// mustIntEnv unchanged
|
|
func mustIntEnv(name string, def int) int {
|
|
if val := os.Getenv(name); val != "" {
|
|
if i, err := strconv.Atoi(val); err == nil {
|
|
return i
|
|
}
|
|
}
|
|
return def
|
|
}
|
|
|
|
// loadPool unchanged
|
|
func loadPool(file string) (*NodePool, error) {
|
|
data, err := ioutil.ReadFile(file)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var pool NodePool
|
|
err = yaml.Unmarshal(data, &pool)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &pool, nil
|
|
}
|
|
|
|
// savePool
|
|
func savePool(file string, pool *NodePool) error {
|
|
data, err := yaml.Marshal(pool)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return ioutil.WriteFile(file, data, 0644)
|
|
}
|
|
|
|
// initNodePool syncs the node pool file with the real nodes in the cluster
|
|
func initNodePool(clientset *kubernetes.Clientset, poolFile string) (*NodePool, error) {
|
|
ctx := context.Background()
|
|
|
|
// Load existing pool, if it exists
|
|
pool, err := loadPool(poolFile)
|
|
if err != nil {
|
|
log.Println("Failed to load existing node pool, starting fresh:", err)
|
|
pool = &NodePool{}
|
|
}
|
|
|
|
// Get current nodes from Kubernetes
|
|
nodes, err := clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to list nodes: %w", err)
|
|
}
|
|
|
|
// Build a map for faster lookup
|
|
poolMap := map[string]*Node{}
|
|
for i := range pool.Nodes {
|
|
poolMap[pool.Nodes[i].Name] = &pool.Nodes[i]
|
|
}
|
|
|
|
// Update pool with nodes that exist in the cluster
|
|
updatedNodes := []Node{}
|
|
for _, n := range nodes.Items {
|
|
ip := nodeIP(&n)
|
|
status := "online"
|
|
role := "worker"
|
|
if isControlPlane(&n) {
|
|
role = "microk8s-controlplane"
|
|
}
|
|
|
|
node := Node{
|
|
Name: n.Name,
|
|
IP: ip,
|
|
Status: status,
|
|
Role: role,
|
|
Cluster: os.Getenv("CLUSTER_NAME"),
|
|
CPU: 0, // will be updated by metrics
|
|
Memory: 0, // will be updated by metrics
|
|
LastActive: time.Now().Format(time.RFC3339),
|
|
}
|
|
|
|
// If this node existed in the old pool, preserve offline status or timestamps
|
|
if oldNode, ok := poolMap[n.Name]; ok {
|
|
node.Status = oldNode.Status
|
|
node.LastActive = oldNode.LastActive
|
|
node.CPU = oldNode.CPU
|
|
node.Memory = oldNode.Memory
|
|
node.Cluster = oldNode.Cluster
|
|
node.Role = oldNode.Role
|
|
}
|
|
|
|
updatedNodes = append(updatedNodes, node)
|
|
}
|
|
|
|
pool.Nodes = updatedNodes
|
|
|
|
// Save back to file
|
|
savePool(poolFile, pool)
|
|
log.Printf("Initialized node pool with %d nodes", len(pool.Nodes))
|
|
|
|
return pool, nil
|
|
}
|
|
|
|
// runSSH unchanged
|
|
func runSSH(host, user, pass, cmd string) (string, error) {
|
|
config := &ssh.ClientConfig{
|
|
User: user,
|
|
Auth: []ssh.AuthMethod{ssh.Password(pass)},
|
|
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
|
|
Timeout: 10 * time.Second,
|
|
}
|
|
client, err := ssh.Dial("tcp", fmt.Sprintf("%s:22", host), config)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
defer client.Close()
|
|
session, err := client.NewSession()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
defer session.Close()
|
|
out, err := session.CombinedOutput(cmd)
|
|
return string(out), err
|
|
}
|
|
|
|
// isControlPlane unchanged
|
|
func isControlPlane(n *v1.Node) bool {
|
|
if _, ok := n.Labels["node.kubernetes.io/microk8s-controlplane"]; ok {
|
|
return true
|
|
}
|
|
if _, ok := n.Labels["node-role.kubernetes.io/control-plane"]; ok {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
// nodeIP unchanged
|
|
func nodeIP(n *v1.Node) string {
|
|
for _, addr := range n.Status.Addresses {
|
|
if addr.Type == v1.NodeInternalIP {
|
|
return addr.Address
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
// Calculate per node utilization
|
|
func updatePerNodeUtilization(
|
|
cs *kubernetes.Clientset,
|
|
ms *metrics.Clientset,
|
|
pool *NodePool,
|
|
) error {
|
|
ctx := context.Background()
|
|
|
|
nodes, err := cs.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
metricsList, err := ms.MetricsV1beta1().NodeMetricses().List(ctx, metav1.ListOptions{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Build lookup tables
|
|
capCPU := map[string]int64{}
|
|
capMem := map[string]int64{}
|
|
|
|
for _, n := range nodes.Items {
|
|
capCPU[n.Name] = n.Status.Capacity.Cpu().MilliValue()
|
|
capMem[n.Name] = n.Status.Capacity.Memory().Value()
|
|
}
|
|
|
|
usageCPU := map[string]int64{}
|
|
usageMem := map[string]int64{}
|
|
|
|
for _, m := range metricsList.Items {
|
|
usageCPU[m.Name] = m.Usage.Cpu().MilliValue()
|
|
usageMem[m.Name] = m.Usage.Memory().Value()
|
|
}
|
|
|
|
// Update pool entries
|
|
for i := range pool.Nodes {
|
|
name := pool.Nodes[i].Name
|
|
|
|
cpuCap, ok1 := capCPU[name]
|
|
memCap, ok2 := capMem[name]
|
|
cpuUse, ok3 := usageCPU[name]
|
|
memUse, ok4 := usageMem[name]
|
|
|
|
if ok1 && ok2 && ok3 && ok4 && cpuCap > 0 && memCap > 0 {
|
|
pool.Nodes[i].CPU = int((cpuUse * 100) / cpuCap)
|
|
pool.Nodes[i].Memory = int((memUse * 100) / memCap)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// clusterUtilization
|
|
func clusterUtilization(
|
|
cs *kubernetes.Clientset,
|
|
ms *metrics.Clientset,
|
|
) (cpuPct int, memPct int, err error) {
|
|
|
|
ctx := context.Background()
|
|
nodes, err := cs.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
|
|
metricsList, err := ms.MetricsV1beta1().NodeMetricses().List(ctx, metav1.ListOptions{})
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
|
|
var (
|
|
totalCPUCap int64
|
|
totalMemCap int64
|
|
totalCPUUse int64
|
|
totalMemUse int64
|
|
)
|
|
|
|
for _, n := range nodes.Items {
|
|
totalCPUCap += n.Status.Capacity.Cpu().MilliValue()
|
|
totalMemCap += n.Status.Capacity.Memory().Value()
|
|
}
|
|
|
|
for _, m := range metricsList.Items {
|
|
totalCPUUse += m.Usage.Cpu().MilliValue()
|
|
totalMemUse += m.Usage.Memory().Value()
|
|
}
|
|
|
|
cpuPct = int((totalCPUUse * 100) / totalCPUCap)
|
|
memPct = int((totalMemUse * 100) / totalMemCap)
|
|
|
|
return cpuPct, memPct, nil
|
|
}
|
|
|
|
// ensureControlPlanes unchanged
|
|
func ensureControlPlanes(
|
|
cs *kubernetes.Clientset,
|
|
pool *NodePool,
|
|
poolFile, sshUser, sshPass, clusterName string,
|
|
desired int,
|
|
) {
|
|
ctx := context.Background()
|
|
nodes, err := cs.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
|
|
if err != nil {
|
|
log.Println("failed to list nodes:", err)
|
|
return
|
|
}
|
|
|
|
cpCount := 0
|
|
var seedIP string
|
|
for _, n := range nodes.Items {
|
|
cpCount++
|
|
if seedIP == "" {
|
|
seedIP = nodeIP(&n)
|
|
}
|
|
}
|
|
|
|
if cpCount >= desired {
|
|
return
|
|
}
|
|
|
|
if seedIP == "" {
|
|
log.Println("no available node found in the node-pool")
|
|
return
|
|
}
|
|
|
|
for i := range pool.Nodes {
|
|
n := &pool.Nodes[i]
|
|
if n.Status != "offline" {
|
|
continue
|
|
}
|
|
|
|
log.Printf("Attempting to activate node %s as control-plane", n.Name)
|
|
out, err := runSSH(seedIP, sshUser, sshPass, "microk8s add-node")
|
|
if err != nil {
|
|
log.Println("add-node failed:", err)
|
|
return
|
|
}
|
|
|
|
var joinCmd string
|
|
for _, line := range strings.Split(out, "\n") {
|
|
if strings.HasPrefix(strings.TrimSpace(line), "microk8s join") {
|
|
joinCmd = strings.TrimSpace(line)
|
|
break
|
|
}
|
|
}
|
|
|
|
if joinCmd == "" {
|
|
log.Println("no join command found")
|
|
return
|
|
}
|
|
|
|
_, err = runSSH(n.IP, sshUser, sshPass, joinCmd)
|
|
if err != nil {
|
|
log.Println("join failed:", err)
|
|
return
|
|
}
|
|
n.Status = "online"
|
|
n.Cluster = clusterName
|
|
n.Role = "microk8s-controlplane"
|
|
n.LastActive = time.Now().Format(time.RFC3339)
|
|
|
|
log.Printf("Control-plane %s activated (%d/%d)", n.Name, cpCount+1, desired)
|
|
return
|
|
}
|
|
}
|
|
|
|
// activateOneWorker unchanged
|
|
func activateOneWorker(
|
|
cs *kubernetes.Clientset,
|
|
pool *NodePool,
|
|
poolFile, sshUser, sshPass, clusterName string,
|
|
) {
|
|
ctx := context.Background()
|
|
var workerNode *Node
|
|
for i := range pool.Nodes {
|
|
n := &pool.Nodes[i]
|
|
if n.Status == "offline" {
|
|
workerNode = &pool.Nodes[i]
|
|
break
|
|
}
|
|
}
|
|
|
|
if workerNode == nil {
|
|
log.Println("No offline nodes available — skipping activation")
|
|
return
|
|
}
|
|
|
|
log.Printf("Attempting to activate %s node as worker node", workerNode.Name)
|
|
nodes, err := cs.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
|
|
if err != nil {
|
|
log.Println("Failed to list nodes:", err)
|
|
return
|
|
}
|
|
|
|
var seedIP string
|
|
for _, n := range nodes.Items {
|
|
if isControlPlane(&n) {
|
|
seedIP = nodeIP(&n)
|
|
break
|
|
}
|
|
}
|
|
|
|
if seedIP == "" {
|
|
log.Println("No control-plane node available to generate join command")
|
|
return
|
|
}
|
|
|
|
out, err := runSSH(seedIP, sshUser, sshPass, "microk8s add-node")
|
|
if err != nil {
|
|
log.Println("add-node failed:", err)
|
|
return
|
|
}
|
|
|
|
var joinCmd string
|
|
for _, line := range strings.Split(out, "\n") {
|
|
if strings.HasPrefix(strings.TrimSpace(line), "microk8s join") {
|
|
joinCmd = strings.TrimSpace(line)
|
|
break
|
|
}
|
|
}
|
|
|
|
if joinCmd == "" {
|
|
log.Println("No join command found from seed control-plane")
|
|
return
|
|
}
|
|
|
|
joinCmd += " --worker"
|
|
_, err = runSSH(workerNode.IP, sshUser, sshPass, joinCmd)
|
|
if err != nil {
|
|
log.Println("Worker join failed:", err)
|
|
return
|
|
}
|
|
|
|
workerNode.Status = "online"
|
|
workerNode.Cluster = clusterName
|
|
workerNode.Role = "worker"
|
|
workerNode.LastActive = time.Now().Format(time.RFC3339)
|
|
|
|
log.Printf("Nnode %s successfully activated as workernode", workerNode.Name)
|
|
}
|
|
|
|
// deactivateOneWorker
|
|
func deactivateOneWorker(cs *kubernetes.Clientset, pool *NodePool, poolFile, sshUser, sshPass, clusterName string, waitSec int) {
|
|
ctx := context.Background()
|
|
nodes, err := cs.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
|
|
if err != nil {
|
|
log.Println("failed to list nodes:", err)
|
|
return
|
|
}
|
|
|
|
var workerNode *v1.Node
|
|
for _, n := range nodes.Items {
|
|
if !isControlPlane(&n) && !n.Spec.Unschedulable {
|
|
workerNode = &n
|
|
break
|
|
}
|
|
}
|
|
|
|
if workerNode == nil {
|
|
log.Println("No worker nodes available for deactivation")
|
|
return
|
|
}
|
|
|
|
log.Printf("Deactivating node: %s", workerNode.Name)
|
|
ip := nodeIP(workerNode)
|
|
|
|
workerNode.Spec.Unschedulable = true
|
|
_, _ = cs.CoreV1().Nodes().Update(ctx, workerNode, metav1.UpdateOptions{})
|
|
|
|
_, err = runSSH(ip, sshUser, sshPass, "microk8s leave")
|
|
if err != nil {
|
|
log.Println("microk8s leave failed:", err)
|
|
// Uncordon so node can be retried later
|
|
workerNode.Spec.Unschedulable = false
|
|
_, _ = cs.CoreV1().Nodes().Update(ctx, workerNode, metav1.UpdateOptions{})
|
|
log.Println("Node uncordoned to allow retry later")
|
|
return
|
|
}
|
|
|
|
_ = cs.CoreV1().Nodes().Delete(ctx, workerNode.Name, metav1.DeleteOptions{})
|
|
|
|
for i := range pool.Nodes {
|
|
if pool.Nodes[i].Name == workerNode.Name {
|
|
pool.Nodes[i].Status = "offline"
|
|
pool.Nodes[i].Cluster = "none"
|
|
pool.Nodes[i].Role = "none"
|
|
pool.Nodes[i].CPU = 0
|
|
pool.Nodes[i].Memory = 0
|
|
pool.Nodes[i].LastActive = time.Now().Format(time.RFC3339)
|
|
}
|
|
}
|
|
time.Sleep(time.Duration(waitSec) * time.Second)
|
|
}
|
|
|
|
// startWebGUI starts a simple HTTP server showing the node pool
|
|
func startWebGUI(poolFile string) {
|
|
|
|
// API endpoint FIRST
|
|
http.HandleFunc("/status", func(w http.ResponseWriter, r *http.Request) {
|
|
log.Println("Serving /status")
|
|
|
|
pool, err := loadPool(poolFile)
|
|
if err != nil {
|
|
log.Println("Cannot load node pool:", err)
|
|
http.Error(w, "cannot load node pool", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(pool)
|
|
})
|
|
|
|
// Static files LAST (catch-all)
|
|
fs := http.FileServer(http.Dir("/app/web"))
|
|
http.Handle("/", fs)
|
|
|
|
go func() {
|
|
log.Println("Web GUI running at :8080")
|
|
if err := http.ListenAndServe(":8080", nil); err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
}()
|
|
}
|
|
// --- MAIN ---
|
|
func main() {
|
|
poolFile := os.Getenv("NODE_LIST_FILE")
|
|
sshUser := os.Getenv("NODE_SSH_USER")
|
|
sshPass := os.Getenv("NODE_SSH_PASS")
|
|
minCPU := mustIntEnv("MIN_CPU", 20)
|
|
maxCPU := mustIntEnv("MAX_CPU", 80)
|
|
minMem := mustIntEnv("MIN_MEM", 30)
|
|
maxMem := mustIntEnv("MAX_MEM", 80)
|
|
desiredCP := mustIntEnv("DESIRED_CONTROL_PLANES", 3)
|
|
waitSec := mustIntEnv("DEACTIVATE_WAIT_SEC", 120)
|
|
clusterName := os.Getenv("CLUSTER_NAME")
|
|
if clusterName == "" {
|
|
log.Fatal("CLUSTER_NAME must be set")
|
|
}
|
|
// 🔎 STARTUP CONFIG DUMP
|
|
log.Println("=== startup configuration ===")
|
|
log.Printf("NODE_LIST_FILE=%q", poolFile)
|
|
log.Printf("NODE_SSH_USER=%q", sshUser)
|
|
log.Printf("MIN_CPU=%d", minCPU)
|
|
log.Printf("MAX_CPU=%d", maxCPU)
|
|
log.Printf("MIN_MEM=%d", minMem)
|
|
log.Printf("MAX_MEM=%d", maxMem)
|
|
log.Printf("DESIRED_CONTROL_PLANES=%d", desiredCP)
|
|
log.Printf("DEACTIVATE_WAIT_SEC=%d", waitSec)
|
|
log.Printf("CLUSTER_NAMED=%q", clusterName)
|
|
log.Println("=============================")
|
|
|
|
// START WEB GUI
|
|
startWebGUI(poolFile)
|
|
|
|
config, err := rest.InClusterConfig()
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
clientset, err := kubernetes.NewForConfig(config)
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
|
|
// Initialize node pool before entering loop
|
|
_, err := initNodePool(clientset, poolFile)
|
|
if err != nil {
|
|
log.Fatal("Failed to initialize node pool:", err)
|
|
}
|
|
|
|
for {
|
|
pool, err := loadPool(poolFile)
|
|
if err != nil {
|
|
log.Println("cannot load node pool:", err)
|
|
time.Sleep(30 * time.Second)
|
|
continue
|
|
}
|
|
|
|
metricsClient, err := metrics.NewForConfig(config)
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
|
|
ensureControlPlanes(clientset, pool, poolFile, sshUser, sshPass, clusterName, desiredCP)
|
|
|
|
clusterCPU, clusterMem, err := clusterUtilization(clientset, metricsClient)
|
|
if err != nil {
|
|
log.Println("failed to compute utilization:", err)
|
|
time.Sleep(30 * time.Second)
|
|
continue
|
|
}
|
|
|
|
log.Printf("Cluster CPU: %d%%, Mem:
|