Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 67 additions & 52 deletions cmd/module-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,21 @@ package main

import (
"context"
"errors"
"fmt"
"github.com/go-logr/logr"
"github.com/koupleless/virtual-kubelet/vnode_controller"
"os"
"os/signal"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
log2 "sigs.k8s.io/controller-runtime/pkg/log"
"strconv"
"syscall"

"github.com/google/uuid"
"github.com/koupleless/module_controller/common/model"
"github.com/koupleless/module_controller/controller/module_deployment_controller"
"github.com/koupleless/module_controller/module_tunnels"
"github.com/koupleless/module_controller/module_tunnels/koupleless_http_tunnel"
"github.com/koupleless/module_controller/module_tunnels/koupleless_mqtt_tunnel"
"github.com/koupleless/module_controller/report_server"
Expand All @@ -35,7 +40,6 @@ import (
"github.com/koupleless/virtual-kubelet/common/trace/opencensus"
"github.com/koupleless/virtual-kubelet/common/tracker"
"github.com/koupleless/virtual-kubelet/common/utils"
"github.com/koupleless/virtual-kubelet/controller/vnode_controller"
vkModel "github.com/koupleless/virtual-kubelet/model"
"github.com/koupleless/virtual-kubelet/tunnel"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -98,6 +102,11 @@ func main() {

// Initialize controller manager
kubeConfig := config.GetConfigOrDie()
// TODO: should support to set from parameter
kubeConfig.QPS = 100
kubeConfig.Burst = 200
ctrl.SetLogger(logr.New(log2.NullLogSink{}))
Comment on lines +105 to +108
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider making QPS and Burst configurable via environment variables.

The hardcoded QPS and Burst values might not be suitable for all environments. As noted in the TODO comment, these should be configurable parameters.

Apply this diff to make these values configurable:

-	// TODO: should support to set from parameter
-	kubeConfig.QPS = 100
-	kubeConfig.Burst = 200
+	qps, err := strconv.ParseFloat(utils.GetEnv("KUBE_QPS", "100"), 32)
+	if err != nil {
+		log.G(ctx).WithError(err).Error("failed to parse KUBE_QPS, using default value 100")
+		qps = 100
+	}
+	burst, err := strconv.Atoi(utils.GetEnv("KUBE_BURST", "200"))
+	if err != nil {
+		log.G(ctx).WithError(err).Error("failed to parse KUBE_BURST, using default value 200")
+		burst = 200
+	}
+	kubeConfig.QPS = float32(qps)
+	kubeConfig.Burst = burst

Committable suggestion skipped: line range outside the PR's diff.


Comment on lines +108 to +109
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Reconsider using NullLogSink for controller-runtime logging

Using NullLogSink will suppress all controller-runtime framework logs, which could make debugging issues in production more difficult. Consider using a proper logger configuration that allows for appropriate log levels.

-ctrl.SetLogger(logr.New(log2.NullLogSink{}))
+ctrl.SetLogger(log2.NewFunctionLogger(
+    func(prefix, args string) {
+        log.G(ctx).V(4).Info(prefix + " " + args)
+    },
+))

Committable suggestion skipped: line range outside the PR's diff.

mgr, err := manager.New(kubeConfig, manager.Options{
Cache: cache.Options{},
HealthProbeBindAddress: ":8081",
Expand All @@ -113,83 +122,42 @@ func main() {

tracker.SetTracker(&tracker.DefaultTracker{})

// Initialize tunnels based on configuration
tunnels := make([]tunnel.Tunnel, 0)
moduleTunnels := make([]module_tunnels.ModuleTunnel, 0)

mqttTunnelEnable := utils.GetEnv("ENABLE_MQTT_TUNNEL", "false")
if mqttTunnelEnable == "true" {
mqttTl := &koupleless_mqtt_tunnel.MqttTunnel{
Cache: mgr.GetCache(),
Client: mgr.GetClient(),
}

tunnels = append(tunnels, mqttTl)
moduleTunnels = append(moduleTunnels, mqttTl)
}

httpTunnelEnable := utils.GetEnv("ENABLE_HTTP_TUNNEL", "false")
if httpTunnelEnable == "true" {
httpTunnelListenPort, err := strconv.Atoi(utils.GetEnv("HTTP_TUNNEL_LISTEN_PORT", "7777"))

if err != nil {
log.G(ctx).WithError(err).Error("failed to parse HTTP_TUNNEL_LISTEN_PORT, set default port 7777")
httpTunnelListenPort = 7777
}

httpTl := &koupleless_http_tunnel.HttpTunnel{
Cache: mgr.GetCache(),
Client: mgr.GetClient(),
Port: httpTunnelListenPort,
}
tunnels = append(tunnels, httpTl)
moduleTunnels = append(moduleTunnels, httpTl)
}

// Configure and create VNode controller
rcc := vkModel.BuildVNodeControllerConfig{
ClientID: clientID,
Env: env,
VPodIdentity: model.ComponentModule,
VPodType: model.ComponentModule,
IsCluster: isCluster,
WorkloadMaxLevel: workloadMaxLevel,
VNodeWorkerNum: vnodeWorkerNum,
}

vc, err := vnode_controller.NewVNodeController(&rcc, tunnels)
mdc, err := module_deployment_controller.NewModuleDeploymentController(env)
if err != nil {
log.G(ctx).Error(err, "unable to set up VNodeController")
log.G(ctx).Error(err, "unable to set up module_deployment_controller")
return
Comment on lines +137 to 138
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ensure consistent program termination on initialization failures.

In cases where initialization fails, the program logs the error and returns, which might allow the program to continue running unintentionally. It's better to exit the program with a non-zero status code to indicate an error.

Apply this diff to exit the program upon initialization failures:

    // In module deployment controller setup failure
    if err != nil {
        log.G(ctx).Error(err, "unable to set up module_deployment_controller")
-       return
+       os.Exit(1)
    }

    // In VNodeController creation failure
    if err != nil {
        log.G(ctx).Error(err, "unable to set up VNodeController")
-       return
+       os.Exit(1)
    }

    // In VNodeController setup failure
    if err != nil {
        log.G(ctx).WithError(err).Error("unable to setup vnode controller")
-       return
+       os.Exit(1)
    }

Also applies to: 143-144, 147-148

}

err = vc.SetupWithManager(ctx, mgr)
err = mdc.SetupWithManager(ctx, mgr)
if err != nil {
Comment on lines +141 to 142
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Correct the error message to reference the correct controller.

The error message mentions "unable to setup vnode controller" while setting up the module deployment controller. This could cause confusion during debugging.

Apply this diff to fix the error message:

    if err != nil {
-       log.G(ctx).WithError(err).Error("unable to setup vnode controller")
+       log.G(ctx).WithError(err).Error("unable to setup module deployment controller")
        return
    }

Committable suggestion skipped: line range outside the PR's diff.

log.G(ctx).WithError(err).Error("unable to setup vnode controller")
return
}

Comment on lines +135 to 146
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Standardize error handling and fix error message

The error handling is inconsistent with other parts of the code:

  1. Some errors trigger os.Exit(1) while others just return
  2. The error message incorrectly mentions "vnode controller" instead of "module deployment controller"

Apply this diff to fix these issues:

    mdc, err := module_deployment_controller.NewModuleDeploymentController(env)
    if err != nil {
-       log.G(ctx).Error(err, "unable to set up module_deployment_controller")
-       return
+       log.G(ctx).Error(err, "unable to set up module deployment controller")
+       os.Exit(1)
    }

    err = mdc.SetupWithManager(ctx, mgr)
    if err != nil {
-       log.G(ctx).WithError(err).Error("unable to setup vnode controller")
-       return
+       log.G(ctx).WithError(err).Error("unable to setup module deployment controller")
+       os.Exit(1)
    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
mdc, err := module_deployment_controller.NewModuleDeploymentController(env)
if err != nil {
log.G(ctx).Error(err, "unable to set up VNodeController")
log.G(ctx).Error(err, "unable to set up module_deployment_controller")
return
}
err = vc.SetupWithManager(ctx, mgr)
err = mdc.SetupWithManager(ctx, mgr)
if err != nil {
log.G(ctx).WithError(err).Error("unable to setup vnode controller")
return
}
mdc, err := module_deployment_controller.NewModuleDeploymentController(env)
if err != nil {
log.G(ctx).Error(err, "unable to set up module deployment controller")
os.Exit(1)
}
err = mdc.SetupWithManager(ctx, mgr)
if err != nil {
log.G(ctx).WithError(err).Error("unable to setup module deployment controller")
os.Exit(1)
}

mdc, err := module_deployment_controller.NewModuleDeploymentController(env, moduleTunnels)
tunnel := startTunnels(ctx, clientID, env, mgr, mdc)

Comment on lines +147 to +148
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Improve error handling for tunnel initialization.

The startTunnels function uses panic for error handling, but this should be handled gracefully at the caller level. Consider modifying the function to return an error instead of panicking.

Apply this diff:

-func startTunnels(ctx context.Context, clientId string, env string, mgr manager.Manager,
-    moduleDeploymentController *module_deployment_controller.ModuleDeploymentController) tunnel.Tunnel {
+func startTunnels(ctx context.Context, clientId string, env string, mgr manager.Manager,
+    moduleDeploymentController *module_deployment_controller.ModuleDeploymentController) (tunnel.Tunnel, error) {

    // ... rest of the function ...
-    if startFailedCount > 0 {
-        panic(errors.New(fmt.Sprintf("failed to start %d tunnels", startFailedCount)))
-    } else if successTunnelCount == 0 {
-        panic(errors.New(fmt.Sprintf("successfully started 0 tunnels")))
-    }
+    if startFailedCount > 0 {
+        return nil, fmt.Errorf("failed to start %d tunnels", startFailedCount)
+    } else if successTunnelCount == 0 {
+        return nil, fmt.Errorf("successfully started 0 tunnels")
+    }
    return tunnels[0], nil
}

// And at the caller:
-tunnel := startTunnels(ctx, clientID, env, mgr, mdc)
+tunnel, err := startTunnels(ctx, clientID, env, mgr, mdc)
+if err != nil {
+    log.G(ctx).Error(err, "failed to initialize tunnels")
+    os.Exit(1)
+}

Committable suggestion skipped: line range outside the PR's diff.

vc, err := vnode_controller.NewVNodeController(&rcc, tunnel)
Comment on lines +147 to +149
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling for tunnel initialization.

The tunnel returned by startTunnels is used directly without checking if it's valid. While startTunnels will panic on failure, it's better to handle errors gracefully at this level for better error reporting and cleanup.

Consider this approach:

-	tunnel := startTunnels(ctx, clientID, env, mgr, mdc)
-
-	vc, err := vnode_controller.NewVNodeController(&rcc, tunnel)
+	tunnel, err := startTunnels(ctx, clientID, env, mgr, mdc)
+	if err != nil {
+		log.G(ctx).Error(err, "failed to initialize tunnels")
+		os.Exit(1)
+	}
+
+	vc, err := vnode_controller.NewVNodeController(&rcc, tunnel)

Committable suggestion skipped: line range outside the PR's diff.

if err != nil {
log.G(ctx).Error(err, "unable to set up module_deployment_controller")
log.G(ctx).Error(err, "unable to set up VNodeController")
return
}

err = mdc.SetupWithManager(ctx, mgr)
err = vc.SetupWithManager(ctx, mgr)
if err != nil {
log.G(ctx).WithError(err).Error("unable to setup vnode controller")
return
}
Comment on lines +149 to 159
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Standardize error handling in VNode controller setup

The error handling in this section is inconsistent. Some errors trigger a return while others call os.Exit(1). This inconsistency could lead to unpredictable shutdown behavior.

Apply this diff to standardize error handling:

    vc, err := vnode_controller.NewVNodeController(&rcc, tunnel)
    if err != nil {
        log.G(ctx).Error(err, "unable to set up VNodeController")
-       return
+       os.Exit(1)
    }

    err = vc.SetupWithManager(ctx, mgr)
    if err != nil {
        log.G(ctx).WithError(err).Error("unable to setup vnode controller")
-       return
+       os.Exit(1)
    }

Committable suggestion skipped: line range outside the PR's diff.


// Start all tunnels
for _, t := range tunnels {
err = t.Start(ctx, clientID, env)
if err != nil {
log.G(ctx).WithError(err).Error("failed to start tunnel", t.Key())
} else {
log.G(ctx).Info("Tunnel started: ", t.Key())
}
}

if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
log.G(ctx).Error(err, "unable to set up health check")
os.Exit(1)
Expand All @@ -205,3 +173,50 @@ func main() {
log.G(ctx).WithError(err).Error("failed to start manager")
}
}

func startTunnels(ctx context.Context, clientId string, env string, mgr manager.Manager,
moduleDeploymentController *module_deployment_controller.ModuleDeploymentController) tunnel.Tunnel {
// Initialize tunnels based on configuration
Comment on lines +177 to +179
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add early validation for tunnel configuration.

The function should validate that at least one tunnel type is enabled before proceeding.

Add this validation at the start of the function:

 func startTunnels(ctx context.Context, clientId string, env string, mgr manager.Manager,
 	moduleDeploymentController *module_deployment_controller.ModuleDeploymentController) tunnel.Tunnel {
+	mqttEnabled := utils.GetEnv("ENABLE_MQTT_TUNNEL", "false") == "true"
+	httpEnabled := utils.GetEnv("ENABLE_HTTP_TUNNEL", "false") == "true"
+	if !mqttEnabled && !httpEnabled {
+		panic(fmt.Errorf("no tunnel types are enabled, set ENABLE_MQTT_TUNNEL=true or ENABLE_HTTP_TUNNEL=true"))
+	}

Committable suggestion skipped: line range outside the PR's diff.

tunnels := make([]tunnel.Tunnel, 0)

mqttTunnelEnable := utils.GetEnv("ENABLE_MQTT_TUNNEL", "false")
if mqttTunnelEnable == "true" {
mqttTl := koupleless_mqtt_tunnel.NewMqttTunnel(env, mgr.GetClient(), moduleDeploymentController)
tunnels = append(tunnels, &mqttTl)
}

httpTunnelEnable := utils.GetEnv("ENABLE_HTTP_TUNNEL", "false")
if httpTunnelEnable == "true" {
httpTunnelListenPort, err := strconv.Atoi(utils.GetEnv("HTTP_TUNNEL_LISTEN_PORT", "7777"))

if err != nil {
log.G(ctx).WithError(err).Error("failed to parse HTTP_TUNNEL_LISTEN_PORT, set default port 7777")
httpTunnelListenPort = 7777
}

httpTl := koupleless_http_tunnel.NewHttpTunnel(env, mgr.GetClient(), moduleDeploymentController, httpTunnelListenPort)
tunnels = append(tunnels, &httpTl)
}

// Start all tunnels
successTunnelCount := 0
startFailedCount := 0
for _, t := range tunnels {
err := t.Start(ctx, clientId, env)
if err != nil {
log.G(ctx).WithError(err).Error("failed to start tunnel", t.Key())
startFailedCount++
} else {
log.G(ctx).Info("Tunnel started: ", t.Key())
successTunnelCount++
}
}
Comment on lines +204 to +213
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add synchronization for concurrent tunnel operations

The tunnel initialization loop lacks synchronization mechanisms. If multiple tunnels are started concurrently, the shared counters (successTunnelCount and startFailedCount) could lead to race conditions.

Apply this diff to add proper synchronization:

+	var mu sync.Mutex
 	for _, t := range tunnels {
 		err := t.Start(ctx, clientId, env)
 		if err != nil {
+			mu.Lock()
 			log.G(ctx).WithError(err).Error("failed to start tunnel", t.Key())
 			startFailedCount++
+			mu.Unlock()
 		} else {
+			mu.Lock()
 			log.G(ctx).Info("Tunnel started: ", t.Key())
 			successTunnelCount++
+			mu.Unlock()
 		}
 	}

Also, add the following import:

+import "sync"

Committable suggestion skipped: line range outside the PR's diff.


if startFailedCount > 0 {
panic(errors.New(fmt.Sprintf("failed to start %d tunnels", startFailedCount)))
} else if successTunnelCount == 0 {
panic(errors.New(fmt.Sprintf("successfully started 0 tunnels")))
}
// we only using one tunnel for now
return tunnels[0]
}
Comment on lines +215 to +222
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Improve error handling and tunnel selection.

The current implementation has several issues:

  1. Using panic for error handling is not idiomatic Go
  2. Accessing tunnels[0] without checking slice length is unsafe
  3. The comment about using only one tunnel suggests potential technical debt

Consider this more robust approach:

-	if startFailedCount > 0 {
-		panic(errors.New(fmt.Sprintf("failed to start %d tunnels", startFailedCount)))
-	} else if successTunnelCount == 0 {
-		panic(errors.New(fmt.Sprintf("successfully started 0 tunnels")))
-	}
-	// we only using one tunnel for now
-	return tunnels[0]
+	if startFailedCount > 0 {
+		return nil, fmt.Errorf("failed to start %d out of %d tunnels", 
+			startFailedCount, len(tunnels))
+	}
+	if successTunnelCount == 0 {
+		return nil, errors.New("no tunnels were configured to start")
+	}
+	if len(tunnels) == 0 {
+		return nil, errors.New("no tunnels were initialized")
+	}
+	// TODO: Define criteria for selecting which tunnel to use when multiple are available
+	return tunnels[0], nil
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if startFailedCount > 0 {
panic(errors.New(fmt.Sprintf("failed to start %d tunnels", startFailedCount)))
} else if successTunnelCount == 0 {
panic(errors.New(fmt.Sprintf("successfully started 0 tunnels")))
}
// we only using one tunnel for now
return tunnels[0]
}
if startFailedCount > 0 {
return nil, fmt.Errorf("failed to start %d out of %d tunnels",
startFailedCount, len(tunnels))
}
if successTunnelCount == 0 {
return nil, errors.New("no tunnels were configured to start")
}
if len(tunnels) == 0 {
return nil, errors.New("no tunnels were initialized")
}
// TODO: Define criteria for selecting which tunnel to use when multiple are available
return tunnels[0], nil
}

Comment on lines +177 to +222
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Refactor startTunnels for better error handling and safety

Several issues need attention:

  1. Using panic for error handling is not idiomatic Go
  2. Potential nil pointer dereference in t.Key()
  3. Unsafe array access at tunnels[0]
  4. Comment suggests technical debt

Consider this safer implementation:

 func startTunnels(ctx context.Context, clientId string, env string, mgr manager.Manager,
-    moduleDeploymentController *module_deployment_controller.ModuleDeploymentController) tunnel.Tunnel {
+    moduleDeploymentController *module_deployment_controller.ModuleDeploymentController) (tunnel.Tunnel, error) {
     tunnels := make([]tunnel.Tunnel, 0)

     mqttTunnelEnable := utils.GetEnv("ENABLE_MQTT_TUNNEL", "false")
     if mqttTunnelEnable == "true" {
         mqttTl := koupleless_mqtt_tunnel.NewMqttTunnel(env, mgr.GetClient(), moduleDeploymentController)
         tunnels = append(tunnels, &mqttTl)
     }

     httpTunnelEnable := utils.GetEnv("ENABLE_HTTP_TUNNEL", "false")
     if httpTunnelEnable == "true" {
         httpTunnelListenPort, err := strconv.Atoi(utils.GetEnv("HTTP_TUNNEL_LISTEN_PORT", "7777"))
         if err != nil {
             log.G(ctx).WithError(err).Error("failed to parse HTTP_TUNNEL_LISTEN_PORT, set default port 7777")
             httpTunnelListenPort = 7777
         }

         httpTl := koupleless_http_tunnel.NewHttpTunnel(env, mgr.GetClient(), moduleDeploymentController, httpTunnelListenPort)
         tunnels = append(tunnels, &httpTl)
     }

     if len(tunnels) == 0 {
         return nil, fmt.Errorf("no tunnels configured to start")
     }

     // Start all tunnels
     var startedTunnel tunnel.Tunnel
     for _, t := range tunnels {
         if t == nil {
             continue
         }
         err := t.Start(ctx, clientId, env)
         if err != nil {
             log.G(ctx).WithError(err).Error("failed to start tunnel", t.Key())
             continue
         }
         log.G(ctx).Info("Tunnel started: ", t.Key())
         startedTunnel = t
         break
     }

     if startedTunnel == nil {
         return nil, fmt.Errorf("failed to start any tunnel")
     }

     return startedTunnel, nil
 }

This implementation:

  1. Returns errors instead of panicking
  2. Checks for nil before calling methods
  3. Safely handles tunnel selection
  4. Provides clear error messages

Consider defining a strategy for handling multiple tunnels in the future, as the current implementation only uses the first successful tunnel.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func startTunnels(ctx context.Context, clientId string, env string, mgr manager.Manager,
moduleDeploymentController *module_deployment_controller.ModuleDeploymentController) tunnel.Tunnel {
// Initialize tunnels based on configuration
tunnels := make([]tunnel.Tunnel, 0)
mqttTunnelEnable := utils.GetEnv("ENABLE_MQTT_TUNNEL", "false")
if mqttTunnelEnable == "true" {
mqttTl := koupleless_mqtt_tunnel.NewMqttTunnel(env, mgr.GetClient(), moduleDeploymentController)
tunnels = append(tunnels, &mqttTl)
}
httpTunnelEnable := utils.GetEnv("ENABLE_HTTP_TUNNEL", "false")
if httpTunnelEnable == "true" {
httpTunnelListenPort, err := strconv.Atoi(utils.GetEnv("HTTP_TUNNEL_LISTEN_PORT", "7777"))
if err != nil {
log.G(ctx).WithError(err).Error("failed to parse HTTP_TUNNEL_LISTEN_PORT, set default port 7777")
httpTunnelListenPort = 7777
}
httpTl := koupleless_http_tunnel.NewHttpTunnel(env, mgr.GetClient(), moduleDeploymentController, httpTunnelListenPort)
tunnels = append(tunnels, &httpTl)
}
// Start all tunnels
successTunnelCount := 0
startFailedCount := 0
for _, t := range tunnels {
err := t.Start(ctx, clientId, env)
if err != nil {
log.G(ctx).WithError(err).Error("failed to start tunnel", t.Key())
startFailedCount++
} else {
log.G(ctx).Info("Tunnel started: ", t.Key())
successTunnelCount++
}
}
if startFailedCount > 0 {
panic(errors.New(fmt.Sprintf("failed to start %d tunnels", startFailedCount)))
} else if successTunnelCount == 0 {
panic(errors.New(fmt.Sprintf("successfully started 0 tunnels")))
}
// we only using one tunnel for now
return tunnels[0]
}
func startTunnels(ctx context.Context, clientId string, env string, mgr manager.Manager,
moduleDeploymentController *module_deployment_controller.ModuleDeploymentController) (tunnel.Tunnel, error) {
tunnels := make([]tunnel.Tunnel, 0)
mqttTunnelEnable := utils.GetEnv("ENABLE_MQTT_TUNNEL", "false")
if mqttTunnelEnable == "true" {
mqttTl := koupleless_mqtt_tunnel.NewMqttTunnel(env, mgr.GetClient(), moduleDeploymentController)
tunnels = append(tunnels, &mqttTl)
}
httpTunnelEnable := utils.GetEnv("ENABLE_HTTP_TUNNEL", "false")
if httpTunnelEnable == "true" {
httpTunnelListenPort, err := strconv.Atoi(utils.GetEnv("HTTP_TUNNEL_LISTEN_PORT", "7777"))
if err != nil {
log.G(ctx).WithError(err).Error("failed to parse HTTP_TUNNEL_LISTEN_PORT, set default port 7777")
httpTunnelListenPort = 7777
}
httpTl := koupleless_http_tunnel.NewHttpTunnel(env, mgr.GetClient(), moduleDeploymentController, httpTunnelListenPort)
tunnels = append(tunnels, &httpTl)
}
if len(tunnels) == 0 {
return nil, fmt.Errorf("no tunnels configured to start")
}
// Start all tunnels
var startedTunnel tunnel.Tunnel
for _, t := range tunnels {
if t == nil {
continue
}
err := t.Start(ctx, clientId, env)
if err != nil {
log.G(ctx).WithError(err).Error("failed to start tunnel", t.Key())
continue
}
log.G(ctx).Info("Tunnel started: ", t.Key())
startedTunnel = t
break
}
if startedTunnel == nil {
return nil, fmt.Errorf("failed to start any tunnel")
}
return startedTunnel, nil
}

Comment on lines +220 to +222
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add safety checks before accessing tunnels slice.

The comment suggests only using one tunnel, but accessing tunnels[0] without checking the slice length could panic.

Apply this diff:

-    // we only using one tunnel for now
-    return tunnels[0]
+    if len(tunnels) == 0 {
+        return nil, fmt.Errorf("no tunnels were initialized")
+    }
+    // TODO: Define criteria for selecting which tunnel to use when multiple are available
+    return tunnels[0], nil
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// we only using one tunnel for now
return tunnels[0]
}
if len(tunnels) == 0 {
return nil, fmt.Errorf("no tunnels were initialized")
}
// TODO: Define criteria for selecting which tunnel to use when multiple are available
return tunnels[0], nil
}

18 changes: 9 additions & 9 deletions common/model/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,26 +53,26 @@ const (

// MQTT topic patterns for base communication
const (
// BaseHeartBeatTopic for heartbeat messages
// BaseHeartBeatTopic for heartbeat messages, broadcast mode
BaseHeartBeatTopic = "koupleless_%s/+/base/heart"
// BaseQueryBaselineTopic for baseline queries
// BaseQueryBaselineTopic for baseline queries, broadcast mode
BaseQueryBaselineTopic = "koupleless_%s/+/base/queryBaseline"
// BaseHealthTopic for health status
// BaseHealthTopic for health status, p2p mode
BaseHealthTopic = "koupleless_%s/%s/base/health"
// BaseSimpleBizTopic for simple business operations
// BaseSimpleBizTopic for simple business operations, p2p mode
BaseSimpleBizTopic = "koupleless_%s/%s/base/simpleBiz"
// BaseAllBizTopic for all business operations
// BaseAllBizTopic for all business operations, p2p mode
BaseAllBizTopic = "koupleless_%s/%s/base/biz"
// BaseBizOperationResponseTopic for business operation responses
// BaseBizOperationResponseTopic for business operation responses, p2p mode
BaseBizOperationResponseTopic = "koupleless_%s/%s/base/bizOperation"
// BaseBaselineResponseTopic for baseline responses
// BaseBaselineResponseTopic for baseline responses, p2p mode
BaseBaselineResponseTopic = "koupleless_%s/%s/base/baseline"
)

// Base labels
const (
// LabelKeyOfTechStack specifies the technology stack
LabelKeyOfTechStack = "base.koupleless.io/stack"
// LabelKeyOfArkletPort specifies the arklet port
LabelKeyOfArkletPort = "base.koupleless.io/arklet-port"
// LabelKeyOfTunnelPort specifies the tunnel port
LabelKeyOfTunnelPort = "base.koupleless.io/tunnel-port"
)
37 changes: 16 additions & 21 deletions common/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,42 +10,37 @@ type ArkMqttMsg[T any] struct {
Data T `json:"data"`
}

// Metadata contains basic identifying information
type Metadata struct {
Name string `json:"name"` // Name of the resource
Version string `json:"version"` // Version identifier
// BaseMetadata contains basic identifying information
type BaseMetadata struct {
Identity string `json:"identity"`
Version string `json:"version"` // Version identifier
ClusterName string `json:"clusterName"` // ClusterName of the resource communicate with base
}

// HeartBeatData is the data of base heart beat.
// BaseStatus is the data of base heart beat.
// Contains information about the base node's status and network details
type HeartBeatData struct {
BaseID string `json:"baseID"` // Unique identifier for the base
State string `json:"state"` // Current state of the base
MasterBizInfo Metadata `json:"masterBizInfo"` // Master business info metadata
NetworkInfo NetworkInfo `json:"networkInfo"` // Network configuration details
}

// NetworkInfo contains network-related configuration
type NetworkInfo struct {
LocalIP string `json:"localIP"` // Local IP address
LocalHostName string `json:"localHostName"` // Local hostname
ArkletPort int `json:"arkletPort"` // Port number for arklet service
type BaseStatus struct {
BaseMetadata BaseMetadata `json:"baseMetadata"` // Master business info metadata
LocalIP string `json:"localIP"` // Local IP address
LocalHostName string `json:"localHostName"` // Local hostname
Port int `json:"port"` // Port number for arklet service
State string `json:"state"` // Current state of the base
}

// BizOperationResponse represents the response from a business operation
type BizOperationResponse struct {
Command string `json:"command"` // Operation command executed
BizName string `json:"bizName"` // Name of the business
BizName string `json:"bizName"` // ClusterName of the business
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider renaming BizName field to match its purpose.

The field name BizName representing a ClusterName could be confusing. Consider renaming for clarity:

-    BizName    string                  `json:"bizName"`    // ClusterName of the business
+    ClusterName string                  `json:"bizName"`    // ClusterName of the business

Note: Keep the JSON tag as "bizName" for backward compatibility if needed.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
BizName string `json:"bizName"` // ClusterName of the business
ClusterName string `json:"bizName"` // ClusterName of the business

BizVersion string `json:"bizVersion"` // Version of the business
Response ark_service.ArkResponse `json:"response"` // Response from ark service
}

// QueryBaselineRequest is the request parameters of query baseline func
// Used to query baseline configuration with filters
type QueryBaselineRequest struct {
Name string `json:"name"` // Name to filter by
Version string `json:"version"` // Version to filter by
CustomLabels map[string]string `json:"customLabels"` // Additional label filters
Identity string `json:"identity"` // Identity base to filter by
ClusterName string `json:"clusterName"` // ClusterName to filter by
Version string `json:"version"` // Version to filter by
}

// BuildModuleDeploymentControllerConfig contains controller configuration
Expand Down
Loading