diff --git a/.github/workflows/release-next.yaml b/.github/workflows/release-next.yaml index ec55566..1b6e9fc 100644 --- a/.github/workflows/release-next.yaml +++ b/.github/workflows/release-next.yaml @@ -1,11 +1,12 @@ -name: Sentinel Release +name: Sentinel Release Development on: push: branches: ['next'] env: - REGISTRY: ghcr.io + GITHUB_REGISTRY: ghcr.io + DOCKER_REGISTRY: docker.io IMAGE_NAME: "coollabsio/sentinel" jobs: @@ -16,20 +17,33 @@ jobs: packages: write steps: - uses: actions/checkout@v4 - - name: Login to ghcr.io + + - name: Login to ${{ env.GITHUB_REGISTRY }} uses: docker/login-action@v3 with: - registry: ${{ env.REGISTRY }} + registry: ${{ env.GITHUB_REGISTRY }} username: ${{ github.actor }} password: ${{ secrets.GITHUB_TOKEN }} - - name: Build and push - uses: docker/build-push-action@v5 + + - name: Login to ${{ env.DOCKER_REGISTRY }} + uses: docker/login-action@v3 + with: + registry: ${{ env.DOCKER_REGISTRY }} + username: ${{ secrets.DOCKER_USERNAME }} + password: ${{ secrets.DOCKER_TOKEN }} + + - name: Build and Push Image + uses: docker/build-push-action@v6 with: context: . file: Dockerfile platforms: linux/amd64 push: true - tags: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:next + tags: | + ${{ env.DOCKER_REGISTRY }}/${{ env.IMAGE_NAME }}:next-amd64 + ${{ env.GITHUB_REGISTRY }}/${{ env.IMAGE_NAME }}:next-amd64 + labels: | + coolify.managed=true aarch64: runs-on: [ self-hosted, arm64 ] permissions: @@ -37,20 +51,33 @@ jobs: packages: write steps: - uses: actions/checkout@v4 - - name: Login to ghcr.io + + - name: Login to ${{ env.GITHUB_REGISTRY }} uses: docker/login-action@v3 with: - registry: ${{ env.REGISTRY }} + registry: ${{ env.GITHUB_REGISTRY }} username: ${{ github.actor }} password: ${{ secrets.GITHUB_TOKEN }} - - name: Build and push - uses: docker/build-push-action@v5 + + - name: Login to ${{ env.DOCKER_REGISTRY }} + uses: docker/login-action@v3 + with: + registry: ${{ env.DOCKER_REGISTRY }} + username: ${{ secrets.DOCKER_USERNAME }} + password: ${{ secrets.DOCKER_TOKEN }} + + - name: Build and Push Image + uses: docker/build-push-action@v6 with: context: . file: Dockerfile.arm64 platforms: linux/aarch64 push: true - tags: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:next-aarch64 + tags: | + ${{ env.DOCKER_REGISTRY }}/${{ env.IMAGE_NAME }}:next-aarch64 + ${{ env.GITHUB_REGISTRY }}/${{ env.IMAGE_NAME }}:next-aarch64 + labels: | + coolify.managed=true merge-manifest: runs-on: ubuntu-latest permissions: @@ -58,18 +85,38 @@ jobs: packages: write needs: [ amd64, aarch64 ] steps: - - name: Checkout - uses: actions/checkout@v4 - - name: Set up QEMU - uses: docker/setup-qemu-action@v3 - - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v3 - - name: Login to ghcr.io + - uses: actions/checkout@v4 + - uses: docker/setup-buildx-action@v3 + + - name: Login to ${{ env.GITHUB_REGISTRY }} uses: docker/login-action@v3 with: - registry: ${{ env.REGISTRY }} + registry: ${{ env.GITHUB_REGISTRY }} username: ${{ github.actor }} password: ${{ secrets.GITHUB_TOKEN }} - - name: Create & publish manifest + + - name: Login to ${{ env.DOCKER_REGISTRY }} + uses: docker/login-action@v3 + with: + registry: ${{ env.DOCKER_REGISTRY }} + username: ${{ secrets.DOCKER_USERNAME }} + password: ${{ secrets.DOCKER_TOKEN }} + + - name: Create & publish manifest on ${{ env.DOCKER_REGISTRY }} + run: | + docker buildx imagetools create \ + --append ${{ env.DOCKER_REGISTRY }}/${{ env.IMAGE_NAME }}:next-aarch64 \ + --tag ${{ env.DOCKER_REGISTRY }}/${{ env.IMAGE_NAME }}:next-amd64 \ + --tag ${{ env.DOCKER_REGISTRY }}/${{ env.IMAGE_NAME }}:next + + - name: Create & publish manifest on ${{ env.GITHUB_REGISTRY }} run: | - docker buildx imagetools create --append ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:next-aarch64 --tag ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:next + docker buildx imagetools create \ + --append ${{ env.GITHUB_REGISTRY }}/${{ env.IMAGE_NAME }}:next-aarch64 \ + --tag ${{ env.GITHUB_REGISTRY }}/${{ env.IMAGE_NAME }}:next-amd64 \ + --tag ${{ env.GITHUB_REGISTRY }}/${{ env.IMAGE_NAME }}:next + + - uses: sarisia/actions-status-discord@v1 + if: always() + with: + webhook: ${{ secrets.DISCORD_WEBHOOK_DEV_RELEASE_CHANNEL }} \ No newline at end of file diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index b47cadb..f238b46 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -1,11 +1,12 @@ -name: Sentinel Release +name: Sentinel Release Development on: release: types: [released] env: - REGISTRY: ghcr.io + GITHUB_REGISTRY: ghcr.io + DOCKER_REGISTRY: docker.io IMAGE_NAME: "coollabsio/sentinel" jobs: @@ -16,27 +17,33 @@ jobs: packages: write steps: - uses: actions/checkout@v4 - - name: Login to ghcr.io + + - name: Login to ${{ env.GITHUB_REGISTRY }} uses: docker/login-action@v3 with: - registry: ${{ env.REGISTRY }} + registry: ${{ env.GITHUB_REGISTRY }} username: ${{ github.actor }} password: ${{ secrets.GITHUB_TOKEN }} - - name: Extract metadata (tags, labels) - id: meta - uses: docker/metadata-action@v5 + + - name: Login to ${{ env.DOCKER_REGISTRY }} + uses: docker/login-action@v3 with: - images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} - tags: | - type=semver,pattern={{version}} - - name: Build and push - uses: docker/build-push-action@v5 + registry: ${{ env.DOCKER_REGISTRY }} + username: ${{ secrets.DOCKER_USERNAME }} + password: ${{ secrets.DOCKER_TOKEN }} + + - name: Build and Push Image + uses: docker/build-push-action@v6 with: context: . file: Dockerfile platforms: linux/amd64 push: true - tags: ${{ steps.meta.outputs.tags }} + tags: | + ${{ env.DOCKER_REGISTRY }}/${{ env.IMAGE_NAME }}:${{ github.event.release.tag_name }}-amd64 + ${{ env.GITHUB_REGISTRY }}/${{ env.IMAGE_NAME }}:${{ github.event.release.tag_name }}-amd64 + labels: | + coolify.managed=true aarch64: runs-on: [ self-hosted, arm64 ] permissions: @@ -44,27 +51,33 @@ jobs: packages: write steps: - uses: actions/checkout@v4 - - name: Login to ghcr.io + + - name: Login to ${{ env.GITHUB_REGISTRY }} uses: docker/login-action@v3 with: - registry: ${{ env.REGISTRY }} + registry: ${{ env.GITHUB_REGISTRY }} username: ${{ github.actor }} password: ${{ secrets.GITHUB_TOKEN }} - - name: Extract metadata (tags, labels) - id: meta - uses: docker/metadata-action@v5 + + - name: Login to ${{ env.DOCKER_REGISTRY }} + uses: docker/login-action@v3 with: - images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} - tags: | - type=semver,pattern={{version}}-aarch64 - - name: Build and push - uses: docker/build-push-action@v5 + registry: ${{ env.DOCKER_REGISTRY }} + username: ${{ secrets.DOCKER_USERNAME }} + password: ${{ secrets.DOCKER_TOKEN }} + + - name: Build and Push Image + uses: docker/build-push-action@v6 with: context: . file: Dockerfile.arm64 platforms: linux/aarch64 push: true - tags: ${{ steps.meta.outputs.tags }}-aarch64 + tags: | + ${{ env.DOCKER_REGISTRY }}/${{ env.IMAGE_NAME }}:${{ github.event.release.tag_name }}-aarch64 + ${{ env.GITHUB_REGISTRY }}/${{ env.IMAGE_NAME }}:${{ github.event.release.tag_name }}-aarch64 + labels: | + coolify.managed=true merge-manifest: runs-on: ubuntu-latest permissions: @@ -72,26 +85,33 @@ jobs: packages: write needs: [ amd64, aarch64 ] steps: - - name: Checkout - uses: actions/checkout@v4 - - name: Set up QEMU - uses: docker/setup-qemu-action@v3 - - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v3 - - name: Login to ghcr.io + - uses: actions/checkout@v4 + - uses: docker/setup-buildx-action@v3 + + - name: Login to ${{ env.GITHUB_REGISTRY }} uses: docker/login-action@v3 with: - registry: ${{ env.REGISTRY }} + registry: ${{ env.GITHUB_REGISTRY }} username: ${{ github.actor }} password: ${{ secrets.GITHUB_TOKEN }} - - name: Extract metadata (tags, labels) - id: meta - uses: docker/metadata-action@v5 + + - name: Login to ${{ env.DOCKER_REGISTRY }} + uses: docker/login-action@v3 with: - images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} - tags: | - type=semver,pattern={{version}} - - name: Create & publish manifest + registry: ${{ env.DOCKER_REGISTRY }} + username: ${{ secrets.DOCKER_USERNAME }} + password: ${{ secrets.DOCKER_TOKEN }} + + - name: Create & publish manifest on ${{ env.DOCKER_REGISTRY }} + run: | + docker buildx imagetools create \ + --append ${{ env.DOCKER_REGISTRY }}/${{ env.IMAGE_NAME }}:${{ github.event.release.tag_name }}-aarch64 \ + --tag ${{ env.DOCKER_REGISTRY }}/${{ env.IMAGE_NAME }}:${{ github.event.release.tag_name }}-amd64 \ + --tag ${{ env.DOCKER_REGISTRY }}/${{ env.IMAGE_NAME }}:${{ github.event.release.tag_name }} + + - name: Create & publish manifest on ${{ env.GITHUB_REGISTRY }} run: | - docker buildx imagetools create --append ${{ fromJSON(steps.meta.outputs.json).tags[0] }}-aarch64 --tag ${{ fromJSON(steps.meta.outputs.json).tags[0] }} - docker buildx imagetools create --append ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:latest-aarch64 --tag ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:latest \ No newline at end of file + docker buildx imagetools create \ + --append ${{ env.GITHUB_REGISTRY }}/${{ env.IMAGE_NAME }}:${{ github.event.release.tag_name }}-aarch64 \ + --tag ${{ env.GITHUB_REGISTRY }}/${{ env.IMAGE_NAME }}:${{ github.event.release.tag_name }}-amd64 \ + --tag ${{ env.GITHUB_REGISTRY }}/${{ env.IMAGE_NAME }}:${{ github.event.release.tag_name }} \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index a9be399..ee1ba20 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,24 +1,20 @@ -FROM golang:1.23-bullseye AS deps +FROM golang:1.23-alpine AS builder WORKDIR /app COPY go.mod go.sum ./ RUN go mod download -FROM golang:1.23-bullseye AS build - -WORKDIR /app -COPY --from=deps /go/pkg/mod /go/pkg/mod COPY . . -RUN apt-get update && apt-get install -y gcc g++ -ENV CGO_ENABLED=1 \ - GOOS=linux \ - GOARCH=amd64 +RUN --mount=type=cache,target=/var/cache/apk \ + apk update && \ + apk add gcc g++ && \ + CGO_ENABLED=1 GOOS=linux GOARCH=amd64 go build -o /app/sentinel ./ -RUN go build -o /app/bin/sentinel ./ +FROM alpine:latest +RUN --mount=type=cache,target=/var/cache/apk \ + apk update && \ + apk add ca-certificates curl -FROM debian:bullseye-slim -RUN apt-get update && apt-get install -y ca-certificates curl && rm -rf /var/lib/apt/lists/* ENV GIN_MODE=release -COPY --from=build /app/ /app -COPY --from=build /app/bin/sentinel /app/sentinel +COPY --from=builder /app/sentinel /app/sentinel CMD ["/app/sentinel"] diff --git a/Dockerfile.arm64 b/Dockerfile.arm64 index ac5f4f8..cf7f83c 100644 --- a/Dockerfile.arm64 +++ b/Dockerfile.arm64 @@ -1,24 +1,20 @@ -FROM golang:1.23-bullseye AS deps +FROM golang:1.23-alpine AS builder WORKDIR /app COPY go.mod go.sum ./ RUN go mod download -FROM golang:1.23-bullseye AS build - -WORKDIR /app -COPY --from=deps /go/pkg/mod /go/pkg/mod COPY . . -RUN apt-get update && apt-get install -y gcc g++ -ENV CGO_ENABLED=1 \ - GOOS=linux \ - GOARCH=arm64 +RUN --mount=type=cache,target=/var/cache/apk \ + apk update && \ + apk add gcc g++ && \ + CGO_ENABLED=1 GOOS=linux GOARCH=arm64 go build -o /app/sentinel ./ -RUN go build -o /app/bin/sentinel ./ +FROM alpine:latest +RUN --mount=type=cache,target=/var/cache/apk \ + apk update && \ + apk add ca-certificates curl -FROM debian:bullseye-slim -RUN apt-get update && apt-get install -y ca-certificates curl && rm -rf /var/lib/apt/lists/* ENV GIN_MODE=release -COPY --from=build /app/ /app -COPY --from=build /app/bin/sentinel /app/sentinel -CMD ["/app/sentinel"] \ No newline at end of file +COPY --from=builder /app/sentinel /app/sentinel +CMD ["/app/sentinel"] diff --git a/cmd/cmd.go b/cmd/cmd.go new file mode 100644 index 0000000..521817d --- /dev/null +++ b/cmd/cmd.go @@ -0,0 +1,209 @@ +package cmd + +import ( + "context" + "errors" + "fmt" + "log" + "net/http" + "os" + "os/signal" + "strconv" + "syscall" + "time" + + "github.com/coollabsio/sentinel/pkg/api" + "github.com/coollabsio/sentinel/pkg/collector" + "github.com/coollabsio/sentinel/pkg/config" + "github.com/coollabsio/sentinel/pkg/db" + "github.com/coollabsio/sentinel/pkg/dockerClient" + "github.com/coollabsio/sentinel/pkg/push" + "github.com/gin-gonic/gin" + "github.com/joho/godotenv" + _ "github.com/mattn/go-sqlite3" + "golang.org/x/sync/errgroup" +) + +// HTTP client (Docker) with connection pooling +var dockerHttpClient = dockerClient.New() + +func HandleSignals(ctx context.Context) error { + signalChan := make(chan os.Signal, 1) + signal.Notify(signalChan, syscall.SIGTERM, os.Interrupt) + + select { + case s := <-signalChan: + switch s { + case syscall.SIGTERM: + return errors.New("received SIGTERM") + case os.Interrupt: // cross-platform SIGINT + return errors.New("received interrupt") + } + case <-ctx.Done(): + return ctx.Err() + } + + return nil +} + +func Execute() error { + config := config.NewDefaultConfig() + if _, err := os.Stat(".env"); os.IsNotExist(err) { + log.Println("No .env file found, skipping load") + } else { + err := godotenv.Load() + if err != nil { + log.Printf("Error loading .env file: %v", err) + } + } + debugFromEnv := os.Getenv("DEBUG") + if debugFromEnv != "" { + var err error + config.Debug, err = strconv.ParseBool(debugFromEnv) + if err != nil { + log.Printf("Error parsing DEBUG: %v", err) + } + } + + endpointFromEnv := os.Getenv("PUSH_ENDPOINT") + if gin.Mode() == gin.DebugMode { + config.MetricsFile = "./db/metrics.sqlite" + + if endpointFromEnv == "" { + config.Endpoint = "http://localhost:8000" + } + } + + if config.Debug && gin.Mode() != gin.DebugMode { + gin.SetMode(gin.DebugMode) + log.Printf("[%s] Debug is enabled.", time.Now().Format("2006-01-02 15:04:05")) + } + + + tokenFromEnv := os.Getenv("TOKEN") + if tokenFromEnv == "" { + return fmt.Errorf("TOKEN environment variable is required") + } + config.Token = tokenFromEnv + + if config.Endpoint == "" && endpointFromEnv == "" { + return fmt.Errorf("PUSH_ENDPOINT environment variable is required") + } + + if config.Endpoint == "" { + config.Endpoint = endpointFromEnv + } + + config.PushUrl = config.Endpoint + config.PushPath + + if pushIntervalSecondsFromEnv := os.Getenv("PUSH_INTERVAL_SECONDS"); pushIntervalSecondsFromEnv != "" { + pushIntervalSecondsInt, err := strconv.Atoi(pushIntervalSecondsFromEnv) + if err != nil { + log.Printf("Error converting PUSH_INTERVAL_SECONDS to int: %v", err) + } else { + config.PushIntervalSeconds = pushIntervalSecondsInt + } + } + if collectorEnabledFromEnv := os.Getenv("COLLECTOR_ENABLED"); collectorEnabledFromEnv != "" { + var err error + config.CollectorEnabled, err = strconv.ParseBool(collectorEnabledFromEnv) + if err != nil { + log.Printf("Error parsing COLLECTOR_ENABLED: %v", err) + } + } + if refreshRateSecondsFromEnv := os.Getenv("COLLECTOR_REFRESH_RATE_SECONDS"); refreshRateSecondsFromEnv != "" { + refreshRateSecondsInt, err := strconv.Atoi(refreshRateSecondsFromEnv) + if err != nil { + log.Printf("Error converting REFRESH_RATE_SECONDS to int: %v", err) + } else { + if refreshRateSecondsInt > 0 { + config.RefreshRateSeconds = refreshRateSecondsInt + } else { + log.Printf("COLLECTOR_REFRESH_RATE_SECONDS must be greater than 0, using default value: %d", config.RefreshRateSeconds) + } + } + } + + if collectorRetentionPeriodDaysFromEnv := os.Getenv("COLLECTOR_RETENTION_PERIOD_DAYS"); collectorRetentionPeriodDaysFromEnv != "" { + collectorRetentionPeriodDaysInt, err := strconv.Atoi(collectorRetentionPeriodDaysFromEnv) + if err != nil { + log.Printf("Error converting COLLECTOR_RETENTION_PERIOD_DAYS to int: %v", err) + } else { + config.CollectorRetentionPeriodDays = collectorRetentionPeriodDaysInt + } + } + + database, err := db.New(config) + if err != nil { + return err + } + + defer database.Close() + + if err := database.CreateDefaultTables(); err != nil { + return err + } + + server := api.New(config, database) + pusherService := push.New(config, dockerHttpClient) + collectorService := collector.New(config, database, dockerHttpClient) + + group, gCtx := errgroup.WithContext(context.Background()) + group.Go(func() error { + return HandleSignals(gCtx) + }) + + // TODO: Do we need to run cleanup if collector is disabled? + group.Go(func() error { + database.Run(gCtx) + return nil + }) + + group.Go(func() error { + pusherService.Run(gCtx) + return nil + }) + // Collector + if config.CollectorEnabled { + group.Go(func() error { + collectorService.Run(gCtx) + return nil + }) + } + + group.Go(func() error { + errorChan := make(chan error, 1) + go func() { + defer close(errorChan) + if err := server.Start(); err != nil && err != http.ErrServerClosed { + errorChan <- err + } + }() + select { + case <-gCtx.Done(): + return nil // context cancelled + case err := <-errorChan: + return err + } + }) + if err := group.Wait(); err != nil { + switch err.Error() { + case "received SIGTERM": + log.Println("received SIGTERM shutting down") + case "received interrupt": + log.Println("received interrupt shutting down") + default: + return err // unexpected error we return + } + } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := server.Stop(ctx); err != nil { + return err + } + select { + case <-ctx.Done(): + log.Println("server shutdown") + } + return nil +} diff --git a/debug.go b/debug.go deleted file mode 100644 index f9284c0..0000000 --- a/debug.go +++ /dev/null @@ -1,206 +0,0 @@ -package main - -import ( - "fmt" - "log" - "math" - "math/rand/v2" - "time" - - "github.com/gin-gonic/gin" - "github.com/shirou/gopsutil/mem" -) - -func setupDebugRoutes(r *gin.Engine) { - r.GET("/api/export/cpu_usage/csv", func(c *gin.Context) { - rows, err := db.Query("COPY cpu_usage TO 'output/cpu_usage.csv';") - if err != nil { - c.JSON(500, gin.H{"error": err.Error()}) - return - } - defer rows.Close() - - }) - r.GET("/api/load/cpu", func(c *gin.Context) { - createTestCpuData() - c.String(200, "ok, cpu load running in the background") - }) - r.GET("/api/load/memory", func(c *gin.Context) { - createTestMemoryData() - c.String(200, "ok, memory load running in the background") - }) - - r.GET("/api/vacuum", func(c *gin.Context) { - vacuum() - c.String(200, "ok") - }) - - r.GET("/api/checkpoint", func(c *gin.Context) { - checkpoint() - c.String(200, "ok") - }) - - r.GET("/api/stats", func(c *gin.Context) { - var count int - var storageUsage int64 - err := db.QueryRow("SELECT COUNT(*), SUM(LENGTH(time) + LENGTH(percent)) FROM cpu_usage").Scan(&count, &storageUsage) - if err != nil { - c.JSON(500, gin.H{"error": err.Error()}) - return - } - - // Convert storage usage to KB - storageKB := float64(storageUsage) / 1024 - // add memory stats - memory, err := mem.VirtualMemory() - if err != nil { - c.JSON(500, gin.H{"error": err.Error()}) - return - } - - // Query to get table names and their sizes - rows, err := db.Query(` - SELECT - table_name, - SUM(estimated_size) AS total_size - FROM duckdb_tables() - GROUP BY table_name - ORDER BY total_size DESC - `) - if err != nil { - c.JSON(500, gin.H{"error": err.Error()}) - return - } - defer rows.Close() - - var tables []gin.H - for rows.Next() { - var tableName string - var sizeBytes int64 - if err := rows.Scan(&tableName, &sizeBytes); err != nil { - log.Printf("Error scanning row: %v", err) - continue - } - - // Convert bytes to MB for readability - sizeMB := float64(sizeBytes) / (1024 * 1024) - - tables = append(tables, gin.H{ - "table_name": tableName, - "size_mb": fmt.Sprintf("%.2f", sizeMB), - "size_kb": fmt.Sprintf("%.2f", sizeMB*1024), - }) - } - - if err := rows.Err(); err != nil { - c.JSON(500, gin.H{"error": err.Error()}) - return - } - - c.JSON(200, gin.H{ - "row_count": count, - "storage_usage_kb": fmt.Sprintf("%.2f", storageKB), - "storage_usage_mb": fmt.Sprintf("%.2f", storageKB/1024), - "memory_usage": memory, - "table_sizes": tables, - }) - }) -} - -func createTestCpuData() { - go func() { - defer func() { - checkpoint() - vacuum() - }() - - numberOfRows := 10000 - numWorkers := 10 - jobs := make(chan int, numberOfRows) - results := make(chan error, numberOfRows) - - // Start worker goroutines - for w := 0; w < numWorkers; w++ { - go func() { - for range jobs { - // Generate a random date within the last month - now := time.Now() - randomDate := now.AddDate(0, 0, -(rand.Int() % 31)) - - timestamp := fmt.Sprintf("%d", randomDate.UnixNano()/int64(time.Millisecond)) - percent := fmt.Sprintf("%.1f", rand.Float64()*100) - _, err := db.Exec(`INSERT INTO cpu_usage (time, percent) VALUES (?, ?)`, timestamp, percent) - results <- err - } - }() - } - - // Send jobs to workers - for i := 0; i < numberOfRows; i++ { - jobs <- i - } - close(jobs) - - // Collect results - for i := 0; i < numberOfRows; i++ { - if err := <-results; err != nil { - log.Printf("Error inserting test data: %v", err) - } - } - }() -} - -func createTestMemoryData() { - go func() { - defer func() { - checkpoint() - vacuum() - }() - - numberOfRows := 10000 - numWorkers := 10 - jobs := make(chan int, numberOfRows) - results := make(chan error, numberOfRows) - - // Start worker goroutines - for w := 0; w < numWorkers; w++ { - go func() { - for range jobs { - // Generate a random date within the last month - now := time.Now() - randomDate := now.AddDate(0, 0, -(rand.Int() % 31)) - - timestamp := fmt.Sprintf("%d", randomDate.UnixNano()/int64(time.Millisecond)) - memory, err := mem.VirtualMemory() - usage := MemUsage{ - Time: timestamp, - Total: memory.Total, - Available: memory.Available, - Used: memory.Used, - UsedPercent: math.Round(memory.UsedPercent*100) / 100, - Free: memory.Free, - } - if err != nil { - log.Printf("Error getting memory usage: %v", err) - continue - } - _, err = db.Exec(`INSERT INTO memory_usage (time, total, available, used, usedPercent, free) VALUES (?, ?, ?, ?, ?, ?)`, usage.Time, usage.Total, usage.Available, usage.Used, usage.UsedPercent, usage.Free) - results <- err - } - }() - } - - // Send jobs to workers - for i := 0; i < numberOfRows; i++ { - jobs <- i - } - close(jobs) - - // Collect results - for i := 0; i < numberOfRows; i++ { - if err := <-results; err != nil { - log.Printf("Error inserting test data: %v", err) - } - } - }() -} diff --git a/go.mod b/go.mod index 0d6b621..33802f7 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module sentinel +module github.com/coollabsio/sentinel go 1.23.1 diff --git a/helpers.go b/helpers.go deleted file mode 100644 index 81b3c35..0000000 --- a/helpers.go +++ /dev/null @@ -1,42 +0,0 @@ -package main - -import ( - "fmt" - "log" - "time" -) - -func getUnixTimeInMilliUTC() string { - queryTimeInUnix := time.Now().UTC().UnixMilli() - queryTimeInUnixString := fmt.Sprintf("%d", queryTimeInUnix) - return queryTimeInUnixString -} - -func vacuum() { - go func() { - defer func() { - if r := recover(); r != nil { - log.Printf("Recovered from panic in vacuum: %v", r) - } - }() - - _, err := db.Exec("VACUUM") - if err != nil { - log.Printf("Error vacuuming: %v", err) - } - }() -} -func checkpoint() { - go func() { - defer func() { - if r := recover(); r != nil { - log.Printf("Recovered from panic in checkpoint: %v", r) - } - }() - - _, err := db.Exec("CHECKPOINT") - if err != nil { - log.Printf("Error checkpointing: %v", err) - } - }() -} diff --git a/json.go b/json.go deleted file mode 100644 index 7a09e81..0000000 --- a/json.go +++ /dev/null @@ -1,27 +0,0 @@ -package main - -import ( - "io" - - goccyjson "github.com/goccy/go-json" -) - -var JSON jsonWrapper - -type jsonWrapper struct{} - -func (j jsonWrapper) Marshal(v interface{}) ([]byte, error) { - return goccyjson.Marshal(v) -} - -func (j jsonWrapper) Unmarshal(data []byte, v interface{}) error { - return goccyjson.Unmarshal(data, v) -} - -func (j jsonWrapper) NewDecoder(r io.Reader) *goccyjson.Decoder { - return goccyjson.NewDecoder(r) -} - -func (j jsonWrapper) NewEncoder(w io.Writer) *goccyjson.Encoder { - return goccyjson.NewEncoder(w) -} diff --git a/main.go b/main.go index 5c0280c..b167837 100644 --- a/main.go +++ b/main.go @@ -1,380 +1,13 @@ package main import ( - "context" - "database/sql" - "errors" "log" - "net" - "net/http" - "net/http/pprof" - "os" - "os/signal" - "path/filepath" - "strconv" - "syscall" - "time" - "github.com/joho/godotenv" - "github.com/gin-gonic/gin" - _ "github.com/mattn/go-sqlite3" - "golang.org/x/sync/errgroup" + "github.com/coollabsio/sentinel/cmd" ) -var debug bool = false -var refreshRateSeconds int = 5 - -var pushEnabled bool = true -var pushIntervalSeconds int = 60 -var pushPath string = "/api/v1/sentinel/push" -var pushUrl string - -var db *sql.DB -var token string -var endpoint string -var metricsFile string = "/app/db/metrics.sqlite" -var collectorEnabled bool = false -var collectorRetentionPeriodDays int = 7 - -// HTTP client with connection pooling -var httpClient = &http.Client{ - Transport: &http.Transport{ - MaxIdleConns: 100, - MaxIdleConnsPerHost: 100, - IdleConnTimeout: 90 * time.Second, - }, - Timeout: 10 * time.Second, -} - -func Token() gin.HandlerFunc { - return func(c *gin.Context) { - if c.GetHeader("Authorization") != "Bearer "+token { - if gin.Mode() == gin.DebugMode { - if c.Query("token") == token { - c.Next() - return - } - } - c.JSON(401, gin.H{ - "error": "Unauthorized", - }) - c.Abort() - return - } - c.Next() - } -} - -func HandleSignals(ctx context.Context) error { - signalChan := make(chan os.Signal, 1) - signal.Notify(signalChan, syscall.SIGTERM, os.Interrupt) - - select { - case s := <-signalChan: - switch s { - case syscall.SIGTERM: - return errors.New("received SIGTERM") - case os.Interrupt: // cross-platform SIGINT - return errors.New("received interrupt") - } - case <-ctx.Done(): - return ctx.Err() - } - - return nil -} - func main() { - if _, err := os.Stat(".env"); os.IsNotExist(err) { - log.Println("No .env file found, skipping load") - } else { - err := godotenv.Load() - if err != nil { - log.Printf("Error loading .env file: %v", err) - } - } - if gin.Mode() == gin.DebugMode { - metricsFile = "./db/metrics.sqlite" - } - debugFromEnv := os.Getenv("DEBUG") - if debugFromEnv != "" { - var err error - debug, err = strconv.ParseBool(debugFromEnv) - if err != nil { - log.Printf("Error parsing DEBUG: %v", err) - } - } - if debug { - log.Printf("[%s] Debug is enabled.", time.Now().Format("2006-01-02 15:04:05")) - } - - tokenFromEnv := os.Getenv("TOKEN") - if tokenFromEnv == "" { - log.Fatal("TOKEN environment variable is required") - } - token = tokenFromEnv - - endpointFromEnv := os.Getenv("PUSH_ENDPOINT") - if gin.Mode() == gin.DebugMode { - if endpointFromEnv == "" { - endpoint = "http://localhost:8000" - } else { - endpoint = endpointFromEnv - } - } else { - if endpointFromEnv == "" { - log.Fatal("PUSH_ENDPOINT environment variable is required") - } else { - endpoint = endpointFromEnv - } - } - pushUrl = endpoint + pushPath - - if os.Getenv("PUSH_INTERVAL_SECONDS") != "" { - pushIntervalSecondsFromEnv := os.Getenv("PUSH_INTERVAL_SECONDS") - if pushIntervalSecondsFromEnv != "" { - pushIntervalSecondsInt, err := strconv.Atoi(pushIntervalSecondsFromEnv) - if err != nil { - log.Printf("Error converting PUSH_INTERVAL_SECONDS to int: %v", err) - } else { - pushIntervalSeconds = pushIntervalSecondsInt - } - } - } - if os.Getenv("COLLECTOR_ENABLED") != "" { - collectorEnabledFromEnv := os.Getenv("COLLECTOR_ENABLED") - if collectorEnabledFromEnv != "" { - var err error - collectorEnabled, err = strconv.ParseBool(collectorEnabledFromEnv) - if err != nil { - log.Printf("Error parsing COLLECTOR_ENABLED: %v", err) - } - } - } - if os.Getenv("COLLECTOR_REFRESH_RATE_SECONDS") != "" { - refreshRateSecondsFromEnv := os.Getenv("COLLECTOR_REFRESH_RATE_SECONDS") - if refreshRateSecondsFromEnv != "" { - refreshRateSecondsInt, err := strconv.Atoi(refreshRateSecondsFromEnv) - if err != nil { - log.Printf("Error converting REFRESH_RATE_SECONDS to int: %v", err) - } else { - if refreshRateSecondsInt > 0 { - refreshRateSeconds = refreshRateSecondsInt - } else { - log.Printf("COLLECTOR_REFRESH_RATE_SECONDS must be greater than 0, using default value: %d", refreshRateSeconds) - } - } - } - } - - if os.Getenv("COLLECTOR_RETENTION_PERIOD_DAYS") != "" { - collectorRetentionPeriodDaysFromEnv := os.Getenv("COLLECTOR_RETENTION_PERIOD_DAYS") - if collectorRetentionPeriodDaysFromEnv != "" { - collectorRetentionPeriodDaysInt, err := strconv.Atoi(collectorRetentionPeriodDaysFromEnv) - if err != nil { - log.Printf("Error converting COLLECTOR_RETENTION_PERIOD_DAYS to int: %v", err) - } else { - collectorRetentionPeriodDays = collectorRetentionPeriodDaysInt - } - } - } - - // create directory based on metricsFile - dir := filepath.Dir(metricsFile) - if err := os.MkdirAll(dir, 0750); err != nil { - log.Fatal(err) - } - // make sure the directory has 0750 permissions - if err := os.Chmod(dir, 0750); err != nil { + if err := cmd.Execute(); err != nil { log.Fatal(err) } - - var err error - db, err = sql.Open("sqlite3", metricsFile) - if err != nil { - log.Fatal(err) - } - defer db.Close() - - // Create tables - // CPU - _, err = db.Exec(`CREATE TABLE IF NOT EXISTS cpu_usage ( - time VARCHAR, - percent VARCHAR, - PRIMARY KEY (time) - )`) - if err != nil { - log.Fatal(err) - } - // Container CPU - _, err = db.Exec(`CREATE TABLE IF NOT EXISTS container_cpu_usage ( - time VARCHAR, - container_id VARCHAR, - percent VARCHAR, - PRIMARY KEY (time, container_id) - )`) - if err != nil { - log.Fatal(err) - } - // Create an index on the container_cpu_usage table for better query performance - _, err = db.Exec(`CREATE INDEX IF NOT EXISTS idx_container_cpu_usage_time_container_id ON container_cpu_usage (container_id,time)`) - if err != nil { - log.Fatal(err) - } - - // Memory - _, err = db.Exec(`CREATE TABLE IF NOT EXISTS memory_usage ( - time VARCHAR, - total VARCHAR, - available VARCHAR, - used VARCHAR, - usedPercent VARCHAR, - free VARCHAR, - PRIMARY KEY (time) - )`) - if err != nil { - log.Fatal(err) - } - // Container Memory - _, err = db.Exec(`CREATE TABLE IF NOT EXISTS container_memory_usage ( - time VARCHAR, - container_id VARCHAR, - total VARCHAR, - available VARCHAR, - used VARCHAR, - usedPercent VARCHAR, - free VARCHAR, - PRIMARY KEY (time, container_id) - )`) - if err != nil { - log.Fatal(err) - } - // Create an index on the container_memory_usage table for better query performance - _, err = db.Exec(`CREATE INDEX IF NOT EXISTS idx_container_memory_usage_time_container_id ON container_memory_usage (time, container_id)`) - if err != nil { - log.Fatal(err) - } - - // Container Logs - _, err = db.Exec(`CREATE TABLE IF NOT EXISTS container_logs (time VARCHAR, container_id VARCHAR, log VARCHAR)`) - if err != nil { - log.Fatal(err) - } - // Create an index on the container_logs table for better query performance - _, err = db.Exec(`CREATE INDEX IF NOT EXISTS idx_container_logs_time_container_id ON container_logs (time, container_id)`) - if err != nil { - log.Fatal(err) - } - - r := gin.Default() - r.GET("/api/health", func(c *gin.Context) { - c.String(200, "ok") - }) - - if gin.Mode() == gin.DebugMode { - setupPushRoute(r) - setupDebugRoutes(r) - setupCpuRoutes(r) - setupContainerRoutes(r) - setupMemoryRoutes(r) - } else { - setupCpuRoutes(r) - setupContainerRoutes(r) - setupMemoryRoutes(r) - } - if debug { - r.GET("/debug/pprof", func(c *gin.Context) { - pprof.Index(c.Writer, c.Request) - }) - r.GET("/debug/cmdline", func(c *gin.Context) { - pprof.Cmdline(c.Writer, c.Request) - }) - r.GET("/debug/profile", func(c *gin.Context) { - pprof.Profile(c.Writer, c.Request) - }) - r.GET("/debug/symbol", func(c *gin.Context) { - pprof.Symbol(c.Writer, c.Request) - }) - r.GET("/debug/trace", func(c *gin.Context) { - pprof.Trace(c.Writer, c.Request) - }) - r.GET("/debug/heap", func(c *gin.Context) { - pprof.Handler("heap").ServeHTTP(c.Writer, c.Request) - }) - r.GET("/debug/goroutine", func(c *gin.Context) { - pprof.Handler("goroutine").ServeHTTP(c.Writer, c.Request) - }) - r.GET("/debug/block", func(c *gin.Context) { - pprof.Handler("block").ServeHTTP(c.Writer, c.Request) - }) - } - group, gCtx := errgroup.WithContext(context.Background()) - group.Go(func() error { - return HandleSignals(gCtx) - }) - group.Go(func() error { - setupPush(gCtx) - return nil - }) - // Collector - if collectorEnabled { - group.Go(func() error { - collector(gCtx) - return nil - }) - } - cleanup() - srv := &http.Server{ - Addr: ":8888", - Handler: r.Handler(), - } - group.Go(func() error { - errorChan := make(chan error, 1) - go func() { - defer close(errorChan) - if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { - errorChan <- err - } - }() - select { - case <-gCtx.Done(): - return nil // context cancelled - case err := <-errorChan: - return err - } - }) - if err := group.Wait(); err != nil { - switch err.Error() { - case "received SIGTERM": - log.Println("received SIGTERM shutting down") - case "received interrupt": - log.Println("received interrupt shutting down") - default: - log.Fatal(err) // unexpected error - } - } - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - if err := srv.Shutdown(ctx); err != nil { - log.Fatal(err) // failure/timeout shutting down the server gracefully - } - select { - case <-ctx.Done(): - log.Println("server shutdown") - } -} - -func makeDockerRequest(url string) (*http.Response, error) { - req, err := http.NewRequest("GET", "http://localhost"+url, nil) - if err != nil { - return nil, err - } - req.Header.Set("Content-Type", "application/json") - - // Use Unix socket transport - httpClient.Transport.(*http.Transport).DialContext = func(ctx context.Context, network, addr string) (net.Conn, error) { - return net.Dial("unix", "/var/run/docker.sock") - } - - return httpClient.Do(req) } diff --git a/pkg/api/api.go b/pkg/api/api.go new file mode 100644 index 0000000..6e775a4 --- /dev/null +++ b/pkg/api/api.go @@ -0,0 +1,39 @@ +package api + +import ( + "context" + "net/http" + + "github.com/coollabsio/sentinel/pkg/api/controller" + "github.com/coollabsio/sentinel/pkg/config" + "github.com/coollabsio/sentinel/pkg/db" +) + +type Api struct { + controller *controller.Controller + srv *http.Server +} + +func New(config *config.Config, database *db.Database) *Api { + controller := controller.New(config, database) + controller.SetupRoutes() + if config.Debug { + controller.SetupDebugRoutes() + } + srv := &http.Server{ + Addr: config.BindAddr, + Handler: controller.GetEngine().Handler(), + } + return &Api{ + controller: controller, + srv: srv, + } +} + +func (a *Api) Start() error { + return a.srv.ListenAndServe() +} + +func (a *Api) Stop(ctx context.Context) error { + return a.srv.Shutdown(ctx) +} diff --git a/container.go b/pkg/api/controller/container.go similarity index 70% rename from container.go rename to pkg/api/controller/container.go index db54550..978194a 100644 --- a/container.go +++ b/pkg/api/controller/container.go @@ -1,4 +1,4 @@ -package main +package controller import ( "regexp" @@ -9,27 +9,17 @@ import ( "github.com/gin-gonic/gin" ) -type Container struct { - Time string `json:"time"` - ID string `json:"id"` - Image string `json:"image"` - Name string `json:"name"` - State string `json:"state"` - Labels map[string]string `json:"labels"` - HealthStatus string `json:"health_status"` -} - var containerIdRegex = regexp.MustCompile(`[^a-zA-Z0-9]+`) -func setupContainerRoutes(r *gin.Engine) { - r.GET("/api/container/:containerId/cpu/history", func(c *gin.Context) { - containerID := strings.ReplaceAll(c.Param("containerId"), "/", "") +func (c *Controller) setupContainerRoutes() { + c.ginE.GET("/api/container/:containerId/cpu/history", func(ctx *gin.Context) { + containerID := strings.ReplaceAll(ctx.Param("containerId"), "/", "") containerID = containerIdRegex.ReplaceAllString(containerID, "") - from := c.Query("from") + from := ctx.Query("from") if from == "" { from = "1970-01-01T00:00:01Z" } - to := c.Query("to") + to := ctx.Query("to") if to == "" { to = time.Now().UTC().Format("2006-01-02T15:04:05Z") } @@ -38,13 +28,13 @@ func setupContainerRoutes(r *gin.Engine) { layout := "2006-01-02T15:04:05Z" if from != "" { if _, err := time.Parse(layout, from); err != nil { - c.JSON(400, gin.H{"error": "Invalid 'from' date format. Use YYYY-MM-DDTHH:MM:SSZ"}) + ctx.JSON(400, gin.H{"error": "Invalid 'from' date format. Use YYYY-MM-DDTHH:MM:SSZ"}) return } } if to != "" { if _, err := time.Parse(layout, to); err != nil { - c.JSON(400, gin.H{"error": "Invalid 'to' date format. Use YYYY-MM-DDTHH:MM:SSZ"}) + ctx.JSON(400, gin.H{"error": "Invalid 'to' date format. Use YYYY-MM-DDTHH:MM:SSZ"}) return } } @@ -68,9 +58,9 @@ func setupContainerRoutes(r *gin.Engine) { params = append(params, toTime.UnixMilli()) } query += " ORDER BY CAST(time AS BIGINT) ASC" - rows, err := db.Query(query, params...) + rows, err := c.database.Query(query, params...) if err != nil { - c.JSON(500, gin.H{"error": err.Error()}) + ctx.JSON(500, gin.H{"error": err.Error()}) return } defer rows.Close() @@ -80,7 +70,7 @@ func setupContainerRoutes(r *gin.Engine) { var usage CpuUsage var containerID string if err := rows.Scan(&usage.Time, &containerID, &usage.Percent); err != nil { - c.JSON(500, gin.H{"error": err.Error()}) + ctx.JSON(500, gin.H{"error": err.Error()}) return } timeInt, _ := strconv.ParseInt(usage.Time, 10, 64) @@ -89,16 +79,16 @@ func setupContainerRoutes(r *gin.Engine) { } usages = append(usages, usage) } - c.JSON(200, usages) + ctx.JSON(200, usages) }) - r.GET("/api/container/:containerId/memory/history", func(c *gin.Context) { - containerID := strings.ReplaceAll(c.Param("containerId"), "/", "") + c.ginE.GET("/api/container/:containerId/memory/history", func(ctx *gin.Context) { + containerID := strings.ReplaceAll(ctx.Param("containerId"), "/", "") containerID = regexp.MustCompile(`[^a-zA-Z0-9]+`).ReplaceAllString(containerID, "") - from := c.Query("from") + from := ctx.Query("from") if from == "" { from = "1970-01-01T00:00:01Z" } - to := c.Query("to") + to := ctx.Query("to") if to == "" { to = time.Now().UTC().Format("2006-01-02T15:04:05Z") } @@ -107,13 +97,13 @@ func setupContainerRoutes(r *gin.Engine) { layout := "2006-01-02T15:04:05Z" if from != "" { if _, err := time.Parse(layout, from); err != nil { - c.JSON(400, gin.H{"error": "Invalid 'from' date format. Use YYYY-MM-DDTHH:MM:SSZ"}) + ctx.JSON(400, gin.H{"error": "Invalid 'from' date format. Use YYYY-MM-DDTHH:MM:SSZ"}) return } } if to != "" { if _, err := time.Parse(layout, to); err != nil { - c.JSON(400, gin.H{"error": "Invalid 'to' date format. Use YYYY-MM-DDTHH:MM:SSZ"}) + ctx.JSON(400, gin.H{"error": "Invalid 'to' date format. Use YYYY-MM-DDTHH:MM:SSZ"}) return } } @@ -137,9 +127,9 @@ func setupContainerRoutes(r *gin.Engine) { params = append(params, toTime.UnixMilli()) } query += " ORDER BY CAST(time AS BIGINT) ASC" - rows, err := db.Query(query, params...) + rows, err := c.database.Query(query, params...) if err != nil { - c.JSON(500, gin.H{"error": err.Error()}) + ctx.JSON(500, gin.H{"error": err.Error()}) return } defer rows.Close() @@ -149,7 +139,7 @@ func setupContainerRoutes(r *gin.Engine) { var usage MemUsage var containerID string if err := rows.Scan(&usage.Time, &containerID, &usage.Total, &usage.Available, &usage.Used, &usage.UsedPercent, &usage.Free); err != nil { - c.JSON(500, gin.H{"error": err.Error()}) + ctx.JSON(500, gin.H{"error": err.Error()}) return } timeInt, _ := strconv.ParseInt(usage.Time, 10, 64) @@ -158,6 +148,6 @@ func setupContainerRoutes(r *gin.Engine) { } usages = append(usages, usage) } - c.JSON(200, usages) + ctx.JSON(200, usages) }) } diff --git a/pkg/api/controller/controller.go b/pkg/api/controller/controller.go new file mode 100644 index 0000000..7fa6f15 --- /dev/null +++ b/pkg/api/controller/controller.go @@ -0,0 +1,70 @@ +package controller + +import ( + "net/http/pprof" + + "github.com/coollabsio/sentinel/pkg/config" + "github.com/coollabsio/sentinel/pkg/db" + "github.com/gin-gonic/gin" +) + +type Controller struct { + database *db.Database + ginE *gin.Engine + config *config.Config +} + +func New(config *config.Config, database *db.Database) *Controller { + return &Controller{ + database: database, + ginE: gin.Default(), + config: config, + } +} + +func (c *Controller) GetEngine() *gin.Engine { + return c.ginE +} + +func (c *Controller) SetupRoutes() { + c.setupHealthRoutes() + c.setupContainerRoutes() + c.setupMemoryRoutes() + c.setupCpuRoutes() +} + +func (c *Controller) setupHealthRoutes() { + c.ginE.GET("/api/health", func(c *gin.Context) { + c.String(200, "ok") + }) +} + +// TODO: Implement c.setupPushRoutes() +func (c *Controller) SetupDebugRoutes() { + c.setupDebugRoutes() + debugGroup := c.ginE.Group("/debug") + debugGroup.GET("/pprof", func(c *gin.Context) { + pprof.Index(c.Writer, c.Request) + }) + debugGroup.GET("/cmdline", func(c *gin.Context) { + pprof.Cmdline(c.Writer, c.Request) + }) + debugGroup.GET("/profile", func(c *gin.Context) { + pprof.Profile(c.Writer, c.Request) + }) + debugGroup.GET("/symbol", func(c *gin.Context) { + pprof.Symbol(c.Writer, c.Request) + }) + debugGroup.GET("/trace", func(c *gin.Context) { + pprof.Trace(c.Writer, c.Request) + }) + debugGroup.GET("/heap", func(c *gin.Context) { + pprof.Handler("heap").ServeHTTP(c.Writer, c.Request) + }) + debugGroup.GET("/goroutine", func(c *gin.Context) { + pprof.Handler("goroutine").ServeHTTP(c.Writer, c.Request) + }) + debugGroup.GET("/block", func(c *gin.Context) { + pprof.Handler("block").ServeHTTP(c.Writer, c.Request) + }) +} diff --git a/cpu.go b/pkg/api/controller/cpu.go similarity index 68% rename from cpu.go rename to pkg/api/controller/cpu.go index 1ea7e6f..c6f05a2 100644 --- a/cpu.go +++ b/pkg/api/controller/cpu.go @@ -1,9 +1,10 @@ -package main +package controller import ( "strconv" "time" + "github.com/coollabsio/sentinel/pkg/utils" "github.com/gin-gonic/gin" "github.com/shirou/gopsutil/cpu" ) @@ -14,22 +15,22 @@ type CpuUsage struct { HumanFriendlyTime string `json:"human_friendly_time,omitempty"` } -func setupCpuRoutes(r *gin.Engine) { - r.GET("/api/cpu/current", func(c *gin.Context) { - queryTimeInUnixString := getUnixTimeInMilliUTC() +func (c *Controller) setupCpuRoutes() { + c.ginE.GET("/api/cpu/current", func(ctx *gin.Context) { + queryTimeInUnixString := utils.GetUnixTimeInMilliUTC() overallPercentage, err := cpu.Percent(0, false) if err != nil { - c.JSON(500, gin.H{"error": err.Error()}) + ctx.JSON(500, gin.H{"error": err.Error()}) return } - c.JSON(200, gin.H{"time": queryTimeInUnixString, "percent": overallPercentage[0]}) + ctx.JSON(200, gin.H{"time": queryTimeInUnixString, "percent": overallPercentage[0]}) }) - r.GET("/api/cpu/history", func(c *gin.Context) { - from := c.Query("from") + c.ginE.GET("/api/cpu/history", func(ctx *gin.Context) { + from := ctx.Query("from") if from == "" { from = "1970-01-01T00:00:00Z" } - to := c.Query("to") + to := ctx.Query("to") if to == "" { to = time.Now().UTC().Format("2006-01-02T15:04:05Z") } @@ -38,13 +39,13 @@ func setupCpuRoutes(r *gin.Engine) { layout := "2006-01-02T15:04:05Z" if from != "" { if _, err := time.Parse(layout, from); err != nil { - c.JSON(400, gin.H{"error": "Invalid 'from' date format. Use YYYY-MM-DDTHH:MM:SSZ"}) + ctx.JSON(400, gin.H{"error": "Invalid 'from' date format. Use YYYY-MM-DDTHH:MM:SSZ"}) return } } if to != "" { if _, err := time.Parse(layout, to); err != nil { - c.JSON(400, gin.H{"error": "Invalid 'to' date format. Use YYYY-MM-DDTHH:MM:SSZ"}) + ctx.JSON(400, gin.H{"error": "Invalid 'to' date format. Use YYYY-MM-DDTHH:MM:SSZ"}) return } } @@ -67,9 +68,9 @@ func setupCpuRoutes(r *gin.Engine) { params = append(params, toTime.UnixMilli()) } query += " ORDER BY CAST(time AS BIGINT) ASC" - rows, err := db.Query(query, params...) + rows, err := c.database.Query(query, params...) if err != nil { - c.JSON(500, gin.H{"error": err.Error()}) + ctx.JSON(500, gin.H{"error": err.Error()}) return } defer rows.Close() @@ -78,7 +79,7 @@ func setupCpuRoutes(r *gin.Engine) { for rows.Next() { var usage CpuUsage if err := rows.Scan(&usage.Time, &usage.Percent); err != nil { - c.JSON(500, gin.H{"error": err.Error()}) + ctx.JSON(500, gin.H{"error": err.Error()}) return } timeInt, _ := strconv.ParseInt(usage.Time, 10, 64) @@ -87,6 +88,6 @@ func setupCpuRoutes(r *gin.Engine) { } usages = append(usages, usage) } - c.JSON(200, usages) + ctx.JSON(200, usages) }) } diff --git a/pkg/api/controller/debug.go b/pkg/api/controller/debug.go new file mode 100644 index 0000000..9f6e2b3 --- /dev/null +++ b/pkg/api/controller/debug.go @@ -0,0 +1,77 @@ +package controller + +import ( + "fmt" + "log" + + "github.com/gin-gonic/gin" + "github.com/shirou/gopsutil/mem" +) + +func (c *Controller) setupDebugRoutes() { + c.ginE.GET("/api/stats", func(ctx *gin.Context) { + var count int + var storageUsage int64 + err := c.database.QueryRow("SELECT COUNT(*), SUM(LENGTH(time) + LENGTH(percent)) FROM cpu_usage").Scan(&count, &storageUsage) + if err != nil { + ctx.JSON(500, gin.H{"error": err.Error()}) + return + } + + // Convert storage usage to KB + storageKB := float64(storageUsage) / 1024 + // add memory stats + memory, err := mem.VirtualMemory() + if err != nil { + ctx.JSON(500, gin.H{"error": err.Error()}) + return + } + + // Query to get table names and their sizes + rows, err := c.database.Query(` + SELECT + table_name, + SUM(estimated_size) AS total_size + FROM duckdb_tables() + GROUP BY table_name + ORDER BY total_size DESC + `) + if err != nil { + ctx.JSON(500, gin.H{"error": err.Error()}) + return + } + defer rows.Close() + + var tables []gin.H + for rows.Next() { + var tableName string + var sizeBytes int64 + if err := rows.Scan(&tableName, &sizeBytes); err != nil { + log.Printf("Error scanning row: %v", err) + continue + } + + // Convert bytes to MB for readability + sizeMB := float64(sizeBytes) / (1024 * 1024) + + tables = append(tables, gin.H{ + "table_name": tableName, + "size_mb": fmt.Sprintf("%.2f", sizeMB), + "size_kb": fmt.Sprintf("%.2f", sizeMB*1024), + }) + } + + if err := rows.Err(); err != nil { + ctx.JSON(500, gin.H{"error": err.Error()}) + return + } + + ctx.JSON(200, gin.H{ + "row_count": count, + "storage_usage_kb": fmt.Sprintf("%.2f", storageKB), + "storage_usage_mb": fmt.Sprintf("%.2f", storageKB/1024), + "memory_usage": memory, + "table_sizes": tables, + }) + }) +} \ No newline at end of file diff --git a/memory.go b/pkg/api/controller/memory.go similarity index 74% rename from memory.go rename to pkg/api/controller/memory.go index fb8e551..ed953aa 100644 --- a/memory.go +++ b/pkg/api/controller/memory.go @@ -1,10 +1,11 @@ -package main +package controller import ( "math" "strconv" "time" + "github.com/coollabsio/sentinel/pkg/utils" "github.com/gin-gonic/gin" "github.com/shirou/gopsutil/mem" ) @@ -19,12 +20,12 @@ type MemUsage struct { HumanFriendlyTime string `json:"human_friendly_time,omitempty"` } -func setupMemoryRoutes(r *gin.Engine) { - r.GET("/api/memory/current", func(c *gin.Context) { - queryTimeInUnixString := getUnixTimeInMilliUTC() +func (c *Controller) setupMemoryRoutes() { + c.ginE.GET("/api/memory/current", func(ctx *gin.Context) { + queryTimeInUnixString := utils.GetUnixTimeInMilliUTC() memory, err := mem.VirtualMemory() if err != nil { - c.JSON(500, gin.H{"error": err.Error()}) + ctx.JSON(500, gin.H{"error": err.Error()}) return } @@ -36,14 +37,14 @@ func setupMemoryRoutes(r *gin.Engine) { UsedPercent: math.Round(memory.UsedPercent*100) / 100, Free: memory.Free, } - c.JSON(200, usage) + ctx.JSON(200, usage) }) - r.GET("/api/memory/history", func(c *gin.Context) { - from := c.Query("from") + c.ginE.GET("/api/memory/history", func(ctx *gin.Context) { + from := ctx.Query("from") if from == "" { from = "1970-01-01T00:00:00Z" } - to := c.Query("to") + to := ctx.Query("to") if to == "" { to = time.Now().UTC().Format("2006-01-02T15:04:05Z") } @@ -52,13 +53,13 @@ func setupMemoryRoutes(r *gin.Engine) { layout := "2006-01-02T15:04:05Z" if from != "" { if _, err := time.Parse(layout, from); err != nil { - c.JSON(400, gin.H{"error": "Invalid 'from' date format. Use YYYY-MM-DDTHH:MM:SSZ"}) + ctx.JSON(400, gin.H{"error": "Invalid 'from' date format. Use YYYY-MM-DDTHH:MM:SSZ"}) return } } if to != "" { if _, err := time.Parse(layout, to); err != nil { - c.JSON(400, gin.H{"error": "Invalid 'to' date format. Use YYYY-MM-DDTHH:MM:SSZ"}) + ctx.JSON(400, gin.H{"error": "Invalid 'to' date format. Use YYYY-MM-DDTHH:MM:SSZ"}) return } } @@ -81,9 +82,9 @@ func setupMemoryRoutes(r *gin.Engine) { params = append(params, toTime.UnixMilli()) } query += " ORDER BY CAST(time AS BIGINT) ASC" - rows, err := db.Query(query, params...) + rows, err := c.database.Query(query, params...) if err != nil { - c.JSON(500, gin.H{"error": err.Error()}) + ctx.JSON(500, gin.H{"error": err.Error()}) return } defer rows.Close() @@ -92,7 +93,7 @@ func setupMemoryRoutes(r *gin.Engine) { for rows.Next() { var usage MemUsage if err := rows.Scan(&usage.Time, &usage.Total, &usage.Available, &usage.Used, &usage.UsedPercent, &usage.Free); err != nil { - c.JSON(500, gin.H{"error": err.Error()}) + ctx.JSON(500, gin.H{"error": err.Error()}) return } timeInt, _ := strconv.ParseInt(usage.Time, 10, 64) @@ -101,6 +102,6 @@ func setupMemoryRoutes(r *gin.Engine) { } usages = append(usages, usage) } - c.JSON(200, usages) + ctx.JSON(200, usages) }) } diff --git a/pkg/api/controller/push.go b/pkg/api/controller/push.go new file mode 100644 index 0000000..058b63b --- /dev/null +++ b/pkg/api/controller/push.go @@ -0,0 +1,19 @@ +package controller + +//Comment out till I figure out how to pass pusher to controller context + +// func (c *Controller) setupPushRoute() { +// c.ginE.POST("/api/push", func(ctx *gin.Context) { +// incomingToken := ctx.GetHeader("Authorization") +// if incomingToken != "Bearer "+c.config.Token { +// ctx.JSON(401, gin.H{"error": "Unauthorized"}) +// return +// } +// data, err := push.GetPushData() +// if err != nil { +// ctx.JSON(500, gin.H{"error": err.Error()}) +// return +// } +// ctx.JSON(200, data) +// }) +// } diff --git a/collector.go b/pkg/collector/collector.go similarity index 61% rename from collector.go rename to pkg/collector/collector.go index a5b4123..e82183f 100644 --- a/collector.go +++ b/pkg/collector/collector.go @@ -1,4 +1,4 @@ -package main +package collector import ( "context" @@ -9,24 +9,35 @@ import ( "sync" "time" - "github.com/docker/docker/api/types" + "github.com/coollabsio/sentinel/pkg/config" + "github.com/coollabsio/sentinel/pkg/db" + "github.com/coollabsio/sentinel/pkg/dockerClient" + "github.com/coollabsio/sentinel/pkg/json" + "github.com/coollabsio/sentinel/pkg/types" + "github.com/coollabsio/sentinel/pkg/utils" + dockerTypes "github.com/docker/docker/api/types" "github.com/shirou/gopsutil/cpu" "github.com/shirou/gopsutil/mem" ) -type ContainerMetrics struct { - Name string `json:"name"` - Time string `json:"time"` - CPUUsagePercentage float64 `json:"cpu_usage_percentage"` - MemoryUsagePercentage float64 `json:"memory_usage_percentage"` - MemoryUsed uint64 `json:"memory_used"` - MemoryAvailable uint64 `json:"available_memory"` +type Collector struct { + config *config.Config + client *dockerClient.DockerClient + database *db.Database } -func collector(ctx context.Context) { - fmt.Printf("[%s] Starting metrics recorder with refresh rate of %d seconds and retention period of %d days.\n", time.Now().Format("2006-01-02 15:04:05"), refreshRateSeconds, collectorRetentionPeriodDays) +func New(config *config.Config, database *db.Database, dockerClient *dockerClient.DockerClient) *Collector { + return &Collector{ + config: config, + client: dockerClient, + database: database, + } +} - ticker := time.NewTicker(time.Duration(refreshRateSeconds) * time.Second) +func (c *Collector) Run(ctx context.Context) { + fmt.Printf("[%s] Starting metrics recorder with refresh rate of %d seconds and retention period of %d days.\n", time.Now().Format("2006-01-02 15:04:05"), c.config.RefreshRateSeconds, c.config.CollectorRetentionPeriodDays) + + ticker := time.NewTicker(time.Duration(c.config.RefreshRateSeconds) * time.Second) defer ticker.Stop() for { @@ -42,7 +53,7 @@ func collector(ctx context.Context) { } }() - queryTimeInUnixString := getUnixTimeInMilliUTC() + queryTimeInUnixString := utils.GetUnixTimeInMilliUTC() // CPU usage overallPercentage, err := cpu.Percent(0, false) @@ -51,12 +62,12 @@ func collector(ctx context.Context) { return } - _, err = db.Exec(`INSERT INTO cpu_usage (time, percent) VALUES (?, ?)`, queryTimeInUnixString, fmt.Sprintf("%.2f", overallPercentage[0])) + _, err = c.database.Exec(`INSERT INTO cpu_usage (time, percent) VALUES (?, ?)`, queryTimeInUnixString, fmt.Sprintf("%.2f", overallPercentage[0])) if err != nil { log.Printf("Error inserting CPU usage into database: %v", err) } - collectContainerMetrics(queryTimeInUnixString) + c.collectContainerMetrics(queryTimeInUnixString) // Memory usage memory, err := mem.VirtualMemory() @@ -65,7 +76,7 @@ func collector(ctx context.Context) { return } - _, err = db.Exec(`INSERT INTO memory_usage (time, total, available, used, usedPercent, free) VALUES (?, ?, ?, ?, ?, ?)`, + _, err = c.database.Exec(`INSERT INTO memory_usage (time, total, available, used, usedPercent, free) VALUES (?, ?, ?, ?, ?, ?)`, queryTimeInUnixString, memory.Total, memory.Available, memory.Used, math.Round(memory.UsedPercent*100)/100, memory.Free) if err != nil { log.Printf("Error inserting memory usage into database: %v", err) @@ -74,18 +85,18 @@ func collector(ctx context.Context) { // Cleanup old data totalRowsToKeep := 10 currentTime := time.Now().UTC().UnixMilli() - cutoffTime := currentTime - int64(collectorRetentionPeriodDays*24*60*60*1000) + cutoffTime := currentTime - int64(c.config.CollectorRetentionPeriodDays*24*60*60*1000) cleanupTable := func(tableName string) { var totalRows int - err := db.QueryRow(fmt.Sprintf("SELECT COUNT(*) FROM %s", tableName)).Scan(&totalRows) + err := c.database.QueryRow(fmt.Sprintf("SELECT COUNT(*) FROM %s", tableName)).Scan(&totalRows) if err != nil { log.Printf("Error counting rows in %s: %v", tableName, err) return } if totalRows > totalRowsToKeep { - _, err = db.Exec(fmt.Sprintf(`DELETE FROM %s WHERE CAST(time AS BIGINT) < ? AND time NOT IN (SELECT time FROM %s ORDER BY time DESC LIMIT ?)`, tableName, tableName), + _, err = c.database.Exec(fmt.Sprintf(`DELETE FROM %s WHERE CAST(time AS BIGINT) < ? AND time NOT IN (SELECT time FROM %s ORDER BY time DESC LIMIT ?)`, tableName, tableName), cutoffTime, totalRowsToKeep) if err != nil { log.Printf("Error deleting old data from %s: %v", tableName, err) @@ -103,10 +114,10 @@ func collector(ctx context.Context) { } } -func collectContainerMetrics(queryTimeInUnixString string) { +func (c *Collector) collectContainerMetrics(queryTimeInUnixString string) { // Container usage // Use makeDockerRequest to interact with Docker API - resp, err := makeDockerRequest("/containers/json?all=true") + resp, err := c.client.MakeRequest("/containers/json?all=true") if err != nil { log.Printf("Error getting containers: %v", err) return @@ -124,26 +135,26 @@ func collectContainerMetrics(queryTimeInUnixString string) { return } - var containers []types.Container - if err := JSON.Unmarshal(containersOutput, &containers); err != nil { + var containers []dockerTypes.Container + if err := json.Unmarshal(containersOutput, &containers); err != nil { log.Printf("Error unmarshalling container list: %v", err) return } var wg sync.WaitGroup - metricsChannel := make(chan ContainerMetrics, len(containers)) + metricsChannel := make(chan types.ContainerMetrics, len(containers)) errChannel := make(chan error, len(containers)) for _, container := range containers { wg.Add(1) - go func(container types.Container) { + go func(container dockerTypes.Container) { defer wg.Done() containerNameFromLabel := container.Labels["coolify.name"] if containerNameFromLabel == "" { containerNameFromLabel = container.Names[0][1:] } - resp, err := makeDockerRequest(fmt.Sprintf("/containers/%s/stats?stream=false", container.ID)) + resp, err := c.client.MakeRequest(fmt.Sprintf("/containers/%s/stats?stream=false", container.ID)) if err != nil { errChannel <- fmt.Errorf("Error getting container stats for %s: %v", containerNameFromLabel, err) return @@ -156,13 +167,13 @@ func collectContainerMetrics(queryTimeInUnixString string) { return } - var v types.StatsJSON - if err := JSON.Unmarshal(statsOutput, &v); err != nil { + var v dockerTypes.StatsJSON + if err := json.Unmarshal(statsOutput, &v); err != nil { errChannel <- fmt.Errorf("Error decoding container stats for %s: %v", containerNameFromLabel, err) return } - metrics := ContainerMetrics{ + metrics := types.ContainerMetrics{ Name: containerNameFromLabel, CPUUsagePercentage: calculateCPUPercent(v), MemoryUsagePercentage: calculateMemoryPercent(v), @@ -185,13 +196,13 @@ func collectContainerMetrics(queryTimeInUnixString string) { } for metrics := range metricsChannel { - _, err = db.Exec(`INSERT INTO container_cpu_usage (time, container_id, percent) VALUES (?, ?, ?)`, + _, err = c.database.Exec(`INSERT INTO container_cpu_usage (time, container_id, percent) VALUES (?, ?, ?)`, queryTimeInUnixString, metrics.Name, fmt.Sprintf("%.2f", metrics.CPUUsagePercentage)) if err != nil { log.Printf("Error inserting container CPU usage into database: %v", err) } - _, err = db.Exec(`INSERT INTO container_memory_usage (time, container_id, total, available, used, usedPercent, free) VALUES (?, ?, ?, ?, ?, ?, ?)`, + _, err = c.database.Exec(`INSERT INTO container_memory_usage (time, container_id, total, available, used, usedPercent, free) VALUES (?, ?, ?, ?, ?, ?, ?)`, queryTimeInUnixString, metrics.Name, metrics.MemoryAvailable, metrics.MemoryAvailable, metrics.MemoryUsed, metrics.MemoryUsagePercentage, metrics.MemoryAvailable-metrics.MemoryUsed) if err != nil { log.Printf("Error inserting container memory usage into database: %v", err) @@ -199,42 +210,19 @@ func collectContainerMetrics(queryTimeInUnixString string) { } } -func cleanup() { - fmt.Printf("[%s] Removing old data.\n", time.Now().Format("2006-01-02 15:04:05")) - - cutoffTime := time.Now().AddDate(0, 0, -collectorRetentionPeriodDays).UnixMilli() - - _, err := db.Exec(`DELETE FROM cpu_usage WHERE CAST(time AS BIGINT) < ?`, cutoffTime) - if err != nil { - log.Printf("Error removing old data: %v", err) - } - - _, err = db.Exec(`DELETE FROM memory_usage WHERE CAST(time AS BIGINT) < ?`, cutoffTime) - if err != nil { - log.Printf("Error removing old memory data: %v", err) - } - - go func() { - for { - time.Sleep(24 * time.Hour) - cleanup() - } - }() -} - -func calculateCPUPercent(stat types.StatsJSON) float64 { +func calculateCPUPercent(stat dockerTypes.StatsJSON) float64 { cpuDelta := float64(stat.CPUStats.CPUUsage.TotalUsage) - float64(stat.PreCPUStats.CPUUsage.TotalUsage) systemDelta := float64(stat.CPUStats.SystemUsage) - float64(stat.PreCPUStats.SystemUsage) numberOfCpus := stat.CPUStats.OnlineCPUs return (cpuDelta / systemDelta) * float64(numberOfCpus) * 100.0 } -func calculateMemoryPercent(stat types.StatsJSON) float64 { +func calculateMemoryPercent(stat dockerTypes.StatsJSON) float64 { usedMemory := float64(stat.MemoryStats.Usage) - float64(stat.MemoryStats.Stats["cache"]) availableMemory := float64(stat.MemoryStats.Limit) return (usedMemory / availableMemory) * 100.0 } -func calculateMemoryUsed(stat types.StatsJSON) uint64 { +func calculateMemoryUsed(stat dockerTypes.StatsJSON) uint64 { return (stat.MemoryStats.Usage) / 1024 / 1024 } diff --git a/pkg/config/config.go b/pkg/config/config.go new file mode 100644 index 0000000..7d0a883 --- /dev/null +++ b/pkg/config/config.go @@ -0,0 +1,33 @@ +package config + +type Config struct { + Debug bool + RefreshRateSeconds int + PushEnabled bool + PushIntervalSeconds int + PushPath string + PushUrl string + Token string + Endpoint string + MetricsFile string + CollectorEnabled bool + CollectorRetentionPeriodDays int + BindAddr string +} + +func NewDefaultConfig() *Config { + return &Config{ + Debug: false, + RefreshRateSeconds: 5, + PushEnabled: true, + PushIntervalSeconds: 60, + PushPath: "/api/v1/sentinel/push", + PushUrl: "", + Token: "", + Endpoint: "", + MetricsFile: "/app/db/metrics.sqlite", + CollectorEnabled: false, + CollectorRetentionPeriodDays: 7, + BindAddr: ":8888", + } +} diff --git a/pkg/db/database.go b/pkg/db/database.go new file mode 100644 index 0000000..ab0a065 --- /dev/null +++ b/pkg/db/database.go @@ -0,0 +1,171 @@ +package db + +import ( + "context" + "database/sql" + "fmt" + "log" + "os" + "path/filepath" + "time" + + "github.com/coollabsio/sentinel/pkg/config" +) + +type Database struct { + db *sql.DB + config *config.Config +} + +func New(config *config.Config) (*Database, error) { + // create directory based on metricsFile + dir := filepath.Dir(config.MetricsFile) + if err := os.MkdirAll(dir, 0750); err != nil { + return nil, err + } + // make sure the directory has 0750 permissions + if err := os.Chmod(dir, 0750); err != nil { + return nil, err + } + + db, err := sql.Open("sqlite3", config.MetricsFile) + if err != nil { + return nil, err + } + + return &Database{ + db: db, + config: config, + }, nil +} + +func (d *Database) Run(ctx context.Context) { + ticker := time.NewTicker(24 * time.Hour) + defer ticker.Stop() + + d.Cleanup() + + for { + select { + case <-ctx.Done(): + fmt.Println("Database cleanup stopped") + return + case <-ticker.C: + d.Cleanup() + } + } +} + +func (d *Database) Cleanup() { + fmt.Printf("[%s] Removing old data.\n", time.Now().Format("2006-01-02 15:04:05")) + + cutoffTime := time.Now().AddDate(0, 0, -d.config.CollectorRetentionPeriodDays).UnixMilli() + + _, err := d.db.Exec(`DELETE FROM cpu_usage WHERE CAST(time AS BIGINT) < ?`, cutoffTime) + if err != nil { + log.Printf("Error removing old data: %v", err) + } + + _, err = d.db.Exec(`DELETE FROM memory_usage WHERE CAST(time AS BIGINT) < ?`, cutoffTime) + if err != nil { + log.Printf("Error removing old memory data: %v", err) + } +} + +func (d *Database) Close() error { + return d.db.Close() +} + +func (d *Database) Exec(query string, args ...interface{}) (sql.Result, error) { + return d.db.Exec(query, args...) +} + +func (d *Database) Query(query string, args ...interface{}) (*sql.Rows, error) { + return d.db.Query(query, args...) +} + +func (d *Database) QueryRow(query string, args ...interface{}) *sql.Row { + return d.db.QueryRow(query, args...) +} + +func (d *Database) Vacuum() error { + _, err := d.db.Exec("VACUUM") + return err +} + +func (d *Database) Checkpoint() error { + _, err := d.db.Exec("CHECKPOINT") + return err +} + +func (d *Database) CreateDefaultTables() error { + var err error + _, err = d.db.Exec(`CREATE TABLE IF NOT EXISTS cpu_usage ( + time VARCHAR, + percent VARCHAR, + PRIMARY KEY (time) + )`) + if err != nil { + return err + } + // Container CPU + _, err = d.db.Exec(`CREATE TABLE IF NOT EXISTS container_cpu_usage ( + time VARCHAR, + container_id VARCHAR, + percent VARCHAR, + PRIMARY KEY (time, container_id) + )`) + if err != nil { + return err + } + // Create an index on the container_cpu_usage table for better query performance + _, err = d.db.Exec(`CREATE INDEX IF NOT EXISTS idx_container_cpu_usage_time_container_id ON container_cpu_usage (container_id,time)`) + if err != nil { + return err + } + + // Memory + _, err = d.db.Exec(`CREATE TABLE IF NOT EXISTS memory_usage ( + time VARCHAR, + total VARCHAR, + available VARCHAR, + used VARCHAR, + usedPercent VARCHAR, + free VARCHAR, + PRIMARY KEY (time) + )`) + if err != nil { + return err + } + // Container Memory + _, err = d.db.Exec(`CREATE TABLE IF NOT EXISTS container_memory_usage ( + time VARCHAR, + container_id VARCHAR, + total VARCHAR, + available VARCHAR, + used VARCHAR, + usedPercent VARCHAR, + free VARCHAR, + PRIMARY KEY (time, container_id) + )`) + if err != nil { + return err + } + // Create an index on the container_memory_usage table for better query performance + _, err = d.db.Exec(`CREATE INDEX IF NOT EXISTS idx_container_memory_usage_time_container_id ON container_memory_usage (time, container_id)`) + if err != nil { + return err + } + + // Container Logs + _, err = d.db.Exec(`CREATE TABLE IF NOT EXISTS container_logs (time VARCHAR, container_id VARCHAR, log VARCHAR)`) + if err != nil { + return err + } + // Create an index on the container_logs table for better query performance + _, err = d.db.Exec(`CREATE INDEX IF NOT EXISTS idx_container_logs_time_container_id ON container_logs (time, container_id)`) + if err != nil { + return err + } + return nil +} diff --git a/pkg/dockerClient/dockerClient.go b/pkg/dockerClient/dockerClient.go new file mode 100644 index 0000000..1fb7fea --- /dev/null +++ b/pkg/dockerClient/dockerClient.go @@ -0,0 +1,40 @@ +package dockerClient + +import ( + "context" + "net" + "net/http" + "time" +) + +/* Docker client is a wrapper around http.Client to easily format request to docker socket */ +type DockerClient struct { + httpClient *http.Client +} + +func New() *DockerClient { + return &DockerClient{ + httpClient: &http.Client{ + Transport: &http.Transport{ + MaxIdleConns: 100, + MaxIdleConnsPerHost: 100, + IdleConnTimeout: 90 * time.Second, + DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { + return net.Dial("unix", "/var/run/docker.sock") + }, + }, + Timeout: 10 * time.Second, + }, + } +} + +func (d *DockerClient) MakeRequest(url string) (*http.Response, error) { + req, err := http.NewRequest("GET", "http://localhost"+url, nil) + if err != nil { + return nil, err + } + + req.Header.Set("Content-Type", "application/json") + + return d.httpClient.Do(req) +} diff --git a/pkg/json/json.go b/pkg/json/json.go new file mode 100644 index 0000000..2242562 --- /dev/null +++ b/pkg/json/json.go @@ -0,0 +1,23 @@ +package json + +import ( + "io" + + goccyjson "github.com/goccy/go-json" +) + +func Marshal(v interface{}) ([]byte, error) { + return goccyjson.Marshal(v) +} + +func Unmarshal(data []byte, v interface{}) error { + return goccyjson.Unmarshal(data, v) +} + +func NewDecoder(r io.Reader) *goccyjson.Decoder { + return goccyjson.NewDecoder(r) +} + +func NewEncoder(w io.Writer) *goccyjson.Encoder { + return goccyjson.NewEncoder(w) +} diff --git a/pkg/push/push.go b/pkg/push/push.go new file mode 100644 index 0000000..97f3027 --- /dev/null +++ b/pkg/push/push.go @@ -0,0 +1,170 @@ +package push + +import ( + "bytes" + "context" + "fmt" + "io" + "log" + "net/http" + "syscall" + "time" + + "github.com/coollabsio/sentinel/pkg/config" + "github.com/coollabsio/sentinel/pkg/dockerClient" + "github.com/coollabsio/sentinel/pkg/json" + "github.com/coollabsio/sentinel/pkg/types" + dockerTypes "github.com/docker/docker/api/types" +) + +type Pusher struct { + config *config.Config + client *http.Client + dockerClient *dockerClient.DockerClient +} + +func New(config *config.Config, dockerClient *dockerClient.DockerClient) *Pusher { + return &Pusher{ + config: config, + client: &http.Client{ + Transport: &http.Transport{ + MaxIdleConns: 100, + MaxIdleConnsPerHost: 100, + IdleConnTimeout: 90 * time.Second, + }, + Timeout: 10 * time.Second, + }, + dockerClient: dockerClient, + } +} + +func (p *Pusher) Run(ctx context.Context) { + ticker := time.NewTicker(time.Duration(p.config.PushIntervalSeconds) * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + fmt.Println("Push operation stopped") + return + case <-ticker.C: + p.GetPushData() + } + } +} + +func (p *Pusher) GetPushData() (map[string]interface{}, error) { + fmt.Printf("[%s] Pushing to [%s]\n", time.Now().Format("2006-01-02 15:04:05"), p.config.PushUrl) + containersData, err := p.containerData() + if err != nil { + log.Printf("Error getting containers data: %v", err) + return nil, err + } + filesystemUsageRoot, err := filesystemUsageRoot() + if err != nil { + log.Printf("Error getting disk usage: %v", err) + return nil, err + } + data := map[string]interface{}{ + "containers": containersData, + "filesystem_usage_root": filesystemUsageRoot, + } + jsonData, err := json.Marshal(data) + if err != nil { + log.Printf("Error marshalling data: %v", err) + return nil, err + } + req, err := http.NewRequest("POST", p.config.PushUrl, bytes.NewBuffer(jsonData)) + if err != nil { + log.Printf("Error creating request: %v", err) + return nil, err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+p.config.Token) + resp, err := p.client.Do(req) + if err != nil { + log.Printf("Error pushing to [%s]: %v", p.config.PushUrl, err) + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + log.Printf("Error pushing to [%s]: status code %d", p.config.PushUrl, resp.StatusCode) + } + return data, nil +} + +func filesystemUsageRoot() (map[string]interface{}, error) { + fs := syscall.Statfs_t{} + err := syscall.Statfs("/", &fs) + if err != nil { + return nil, err + } + totalSpace := fs.Blocks * uint64(fs.Bsize) + freeSpace := fs.Bfree * uint64(fs.Bsize) + usedSpace := totalSpace - freeSpace + usedPercentage := float64(usedSpace) / float64(totalSpace) * 100 + + return map[string]interface{}{ + "used_percentage": fmt.Sprintf("%d", int(usedPercentage)), + }, nil +} + +func (p *Pusher) containerData() ([]types.Container, error) { + resp, err := p.dockerClient.MakeRequest("/containers/json?all=true") + if err != nil { + log.Printf("Error getting containers: %v", err) + return nil, err + } + defer resp.Body.Close() + + containersOutput, err := io.ReadAll(resp.Body) + if err != nil { + log.Printf("Error reading containers response: %v", err) + return nil, err + } + + var containers []dockerTypes.Container + if err := json.Unmarshal(containersOutput, &containers); err != nil { + log.Printf("Error unmarshalling container list: %v", err) + return nil, err + } + + var containersData []types.Container + for _, container := range containers { + resp, err := p.dockerClient.MakeRequest(fmt.Sprintf("/containers/%s/json", container.ID)) + if err != nil { + log.Printf("Error inspecting container %s: %v", container.ID, err) + continue + } + defer resp.Body.Close() + + inspectOutput, err := io.ReadAll(resp.Body) + if err != nil { + log.Printf("Error reading inspect response for container %s: %v", container.ID, err) + continue + } + + var inspectData dockerTypes.ContainerJSON + if err := json.Unmarshal(inspectOutput, &inspectData); err != nil { + log.Printf("Error unmarshalling inspect data for container %s: %v", container.ID, err) + continue + } + + healthStatus := "unhealthy" + if inspectData.State.Health != nil { + healthStatus = inspectData.State.Health.Status + } + + containersData = append(containersData, types.Container{ + Time: time.Now().Format("2006-01-02T15:04:05Z"), + ID: container.ID, + Image: container.Image, + Labels: container.Labels, + Name: container.Names[0][1:], + State: container.State, + HealthStatus: healthStatus, + }) + } + return containersData, nil +} diff --git a/pkg/types/container.go b/pkg/types/container.go new file mode 100644 index 0000000..48af32d --- /dev/null +++ b/pkg/types/container.go @@ -0,0 +1,22 @@ +package types + +/* Package contains types shared between collector and pusher */ + +type ContainerMetrics struct { + Name string `json:"name"` + Time string `json:"time"` + CPUUsagePercentage float64 `json:"cpu_usage_percentage"` + MemoryUsagePercentage float64 `json:"memory_usage_percentage"` + MemoryUsed uint64 `json:"memory_used"` + MemoryAvailable uint64 `json:"available_memory"` +} + +type Container struct { + Time string `json:"time"` + ID string `json:"id"` + Image string `json:"image"` + Name string `json:"name"` + State string `json:"state"` + Labels map[string]string `json:"labels"` + HealthStatus string `json:"health_status"` +} diff --git a/pkg/utils/helpers.go b/pkg/utils/helpers.go new file mode 100644 index 0000000..fe502d0 --- /dev/null +++ b/pkg/utils/helpers.go @@ -0,0 +1,12 @@ +package utils + +import ( + "fmt" + "time" +) + +func GetUnixTimeInMilliUTC() string { + queryTimeInUnix := time.Now().UTC().UnixMilli() + queryTimeInUnixString := fmt.Sprintf("%d", queryTimeInUnix) + return queryTimeInUnixString +} diff --git a/push.go b/push.go deleted file mode 100644 index 0152d89..0000000 --- a/push.go +++ /dev/null @@ -1,140 +0,0 @@ -package main - -import ( - "bytes" - "context" - "fmt" - "io" - "log" - "net/http" - "time" - - "github.com/docker/docker/api/types" - "github.com/gin-gonic/gin" -) - -func setupPushRoute(r *gin.Engine) { - r.POST("/api/push", func(c *gin.Context) { - incomingToken := c.GetHeader("Authorization") - if incomingToken != "Bearer "+token { - c.JSON(401, gin.H{"error": "Unauthorized"}) - return - } - data, err := getPushData() - if err != nil { - c.JSON(500, gin.H{"error": err.Error()}) - return - } - c.JSON(200, data) - }) -} - -func setupPush(ctx context.Context) { - ticker := time.NewTicker(time.Duration(pushIntervalSeconds) * time.Second) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - fmt.Println("Push operation stopped") - return - case <-ticker.C: - getPushData() - } - } -} - -func getPushData() (map[string]interface{}, error) { - fmt.Printf("[%s] Pushing to [%s]\n", time.Now().Format("2006-01-02 15:04:05"), pushUrl) - containersData, err := containerData() - if err != nil { - log.Printf("Error getting containers data: %v", err) - return nil, err - } - data := map[string]interface{}{ - "containers": containersData, - } - jsonData, err := JSON.Marshal(data) - if err != nil { - log.Printf("Error marshalling data: %v", err) - return nil, err - } - req, err := http.NewRequest("POST", pushUrl, bytes.NewBuffer(jsonData)) - if err != nil { - log.Printf("Error creating request: %v", err) - return nil, err - } - req.Header.Set("Content-Type", "application/json") - req.Header.Set("Authorization", "Bearer "+token) - resp, err := httpClient.Do(req) - if err != nil { - log.Printf("Error pushing to [%s]: %v", pushUrl, err) - return nil, err - } - defer resp.Body.Close() - - if resp.StatusCode != 200 { - body, _ := io.ReadAll(resp.Body) - log.Printf("Error pushing to [%s]: status code %d, response: %s", pushUrl, resp.StatusCode, string(body)) - } - return data, nil -} - -func containerData() ([]Container, error) { - resp, err := makeDockerRequest("/containers/json?all=true") - if err != nil { - log.Printf("Error getting containers: %v", err) - return nil, err - } - defer resp.Body.Close() - - containersOutput, err := io.ReadAll(resp.Body) - if err != nil { - log.Printf("Error reading containers response: %v", err) - return nil, err - } - - var containers []types.Container - if err := JSON.Unmarshal(containersOutput, &containers); err != nil { - log.Printf("Error unmarshalling container list: %v", err) - return nil, err - } - - var containersData []Container - for _, container := range containers { - resp, err := makeDockerRequest(fmt.Sprintf("/containers/%s/json", container.ID)) - if err != nil { - log.Printf("Error inspecting container %s: %v", container.ID, err) - continue - } - defer resp.Body.Close() - - inspectOutput, err := io.ReadAll(resp.Body) - if err != nil { - log.Printf("Error reading inspect response for container %s: %v", container.ID, err) - continue - } - - var inspectData types.ContainerJSON - if err := JSON.Unmarshal(inspectOutput, &inspectData); err != nil { - log.Printf("Error unmarshalling inspect data for container %s: %v", container.ID, err) - continue - } - - healthStatus := "unhealthy" - if inspectData.State.Health != nil { - healthStatus = inspectData.State.Health.Status - } - - containersData = append(containersData, Container{ - Time: time.Now().Format("2006-01-02T15:04:05Z"), - ID: container.ID, - Image: container.Image, - Labels: container.Labels, - Name: container.Names[0][1:], - State: container.State, - HealthStatus: healthStatus, - }) - } - return containersData, nil -}