Compare commits

...

14 Commits

Author SHA1 Message Date
Nikola Jokic
3fab744a4f Prepare 0.8.0 release (#3175) 2023-12-20 11:16:56 +01:00
Nikola Jokic
fe8c3bb789 Change listener container name (#3167) 2023-12-19 12:22:52 +01:00
Nikola Jokic
e40874f67f Fix assertion test in wait for delete (#3146) 2023-12-18 17:04:35 +01:00
Serge
d7d479172d Fix override listener pod spec (#3139) (#3161)
Signed-off-by: Serge Logvinov <serge.logvinov@sinextra.dev>
2023-12-18 16:50:06 +01:00
Nikola Jokic
31352924d7 Fix empty env and volumeMounts object on default setup (#3166) 2023-12-18 16:01:34 +01:00
dependabot[bot]
3e4201ac5f Bump k8s.io/client-go from 0.28.3 to 0.28.4 (#3125)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Nikola Jokic <jokicnikola07@gmail.com>
2023-12-14 12:58:35 +01:00
dependabot[bot]
a44b037d6b Bump golang.org/x/oauth2 from 0.14.0 to 0.15.0 (#3127)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-12-14 12:57:42 +01:00
dependabot[bot]
e11beea49b Bump golang.org/x/net from 0.18.0 to 0.19.0 (#3126)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Nikola Jokic <jokicnikola07@gmail.com>
2023-12-14 09:45:22 +01:00
dependabot[bot]
bfadad0830 Bump github.com/gruntwork-io/terratest from 0.41.24 to 0.46.7 (#3091)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Nikola Jokic <jokicnikola07@gmail.com>
2023-12-14 09:35:07 +01:00
Nikola Jokic
f7eb88ce9c Change minRunners behavior and fix the new listener min runners (#3139) 2023-12-13 19:39:21 +01:00
Nikola Jokic
0fd8eac305 Update user agent for new ghalistener (#3138) 2023-12-08 14:01:22 +01:00
Nikola Jokic
b78cadd901 Refactoring listener app with configurable fallback (#3096) 2023-12-08 13:41:06 +01:00
Nikola Jokic
202a97ab12 Modify user agent format with subsystem and is proxy configured information (#3116) 2023-12-08 13:16:29 +01:00
Toru Komatsu
b08d533105 Record the error when the creation pod fails (#3112)
Signed-off-by: utam0k <k0ma@utam0k.jp>
2023-12-07 21:11:52 +01:00
44 changed files with 2945 additions and 62 deletions

View File

@@ -193,7 +193,7 @@ runs:
shell: bash
run: |
helm uninstall ${{ inputs.arc-name }} --namespace ${{inputs.arc-namespace}} --debug
kubectl wait --timeout=10s --for=delete AutoScalingRunnerSet -n ${{inputs.arc-name}} -l app.kubernetes.io/instance=${{ inputs.arc-name }}
kubectl wait --timeout=30s --for=delete AutoScalingRunnerSet -n ${{inputs.arc-namespace}} -l app.kubernetes.io/instance=${{ inputs.arc-name }}
- name: Gather logs and cleanup
shell: bash

View File

@@ -16,7 +16,7 @@ env:
TARGET_ORG: actions-runner-controller
TARGET_REPO: arc_e2e_test_dummy
IMAGE_NAME: "arc-test-image"
IMAGE_VERSION: "0.7.0"
IMAGE_VERSION: "0.8.0"
concurrency:
# This will make sure we only apply the concurrency limits on pull requests
@@ -880,3 +880,98 @@ jobs:
helm uninstall "${{ steps.install_arc.outputs.ARC_NAME }}" --namespace "arc-runners" --debug
kubectl wait --timeout=10s --for=delete AutoScalingRunnerSet -n "${{ steps.install_arc.outputs.ARC_NAME }}" -l app.kubernetes.io/instance="${{ steps.install_arc.outputs.ARC_NAME }}"
kubectl logs deployment/arc-gha-rs-controller -n "arc-systems"
init-with-min-runners:
runs-on: ubuntu-latest
timeout-minutes: 20
if: github.event_name != 'pull_request' || github.event.pull_request.head.repo.id == github.repository_id
env:
WORKFLOW_FILE: arc-test-workflow.yaml
steps:
- uses: actions/checkout@v3
with:
ref: ${{ github.head_ref }}
- uses: ./.github/actions/setup-arc-e2e
id: setup
with:
app-id: ${{secrets.E2E_TESTS_ACCESS_APP_ID}}
app-pk: ${{secrets.E2E_TESTS_ACCESS_PK}}
image-name: ${{env.IMAGE_NAME}}
image-tag: ${{env.IMAGE_VERSION}}
target-org: ${{env.TARGET_ORG}}
- name: Install gha-runner-scale-set-controller
id: install_arc_controller
run: |
helm install arc \
--namespace "arc-systems" \
--create-namespace \
--set image.repository=${{ env.IMAGE_NAME }} \
--set image.tag=${{ env.IMAGE_VERSION }} \
--set flags.updateStrategy="eventual" \
./charts/gha-runner-scale-set-controller \
--debug
count=0
while true; do
POD_NAME=$(kubectl get pods -n arc-systems -l app.kubernetes.io/name=gha-rs-controller -o name)
if [ -n "$POD_NAME" ]; then
echo "Pod found: $POD_NAME"
break
fi
if [ "$count" -ge 60 ]; then
echo "Timeout waiting for controller pod with label app.kubernetes.io/name=gha-rs-controller"
exit 1
fi
sleep 1
count=$((count+1))
done
kubectl wait --timeout=30s --for=condition=ready pod -n arc-systems -l app.kubernetes.io/name=gha-rs-controller
kubectl get pod -n arc-systems
kubectl describe deployment arc-gha-rs-controller -n arc-systems
- name: Install gha-runner-scale-set
id: install_arc
run: |
ARC_NAME=${{github.job}}-$(date +'%M%S')$((($RANDOM + 100) % 100 + 1))
helm install "$ARC_NAME" \
--namespace "arc-runners" \
--create-namespace \
--set githubConfigUrl="https://github.com/${{ env.TARGET_ORG }}/${{env.TARGET_REPO}}" \
--set githubConfigSecret.github_token="${{ steps.setup.outputs.token }}" \
--set minRunners=5 \
./charts/gha-runner-scale-set \
--debug
echo "ARC_NAME=$ARC_NAME" >> $GITHUB_OUTPUT
count=0
while true; do
POD_NAME=$(kubectl get pods -n arc-systems -l actions.github.com/scale-set-name=$ARC_NAME -o name)
if [ -n "$POD_NAME" ]; then
echo "Pod found: $POD_NAME"
break
fi
if [ "$count" -ge 60 ]; then
echo "Timeout waiting for listener pod with label actions.github.com/scale-set-name=$ARC_NAME"
exit 1
fi
sleep 1
count=$((count+1))
done
kubectl wait --timeout=30s --for=condition=ready pod -n arc-systems -l actions.github.com/scale-set-name=$ARC_NAME
kubectl get pod -n arc-systems
- name: Ensure 5 runners are up
run: |
count=0
while true; do
pod_count=$(kubectl get pods -n arc-runners --no-headers | wc -l)
if [[ "$pod_count" = 5 ]]; then
echo "5 pods are up!"
break
fi
if [[ "$count" -ge 12 ]]; then
echo "Timeout waiting for 5 pods to be created"
exit 1
fi
sleep 1
count=$((count+1))
done

View File

@@ -38,6 +38,7 @@ RUN --mount=target=. \
export GOOS=${TARGETOS} GOARCH=${TARGETARCH} GOARM=${TARGETVARIANT#v} && \
go build -trimpath -ldflags="-s -w -X 'github.com/actions/actions-runner-controller/build.Version=${VERSION}' -X 'github.com/actions/actions-runner-controller/build.CommitSHA=${COMMIT_SHA}'" -o /out/manager main.go && \
go build -trimpath -ldflags="-s -w -X 'github.com/actions/actions-runner-controller/build.Version=${VERSION}' -X 'github.com/actions/actions-runner-controller/build.CommitSHA=${COMMIT_SHA}'" -o /out/github-runnerscaleset-listener ./cmd/githubrunnerscalesetlistener && \
go build -trimpath -ldflags="-s -w -X 'github.com/actions/actions-runner-controller/build.Version=${VERSION}' -X 'github.com/actions/actions-runner-controller/build.CommitSHA=${COMMIT_SHA}'" -o /out/ghalistener ./cmd/ghalistener && \
go build -trimpath -ldflags="-s -w" -o /out/github-webhook-server ./cmd/githubwebhookserver && \
go build -trimpath -ldflags="-s -w" -o /out/actions-metrics-server ./cmd/actionsmetricsserver && \
go build -trimpath -ldflags="-s -w" -o /out/sleep ./cmd/sleep
@@ -52,6 +53,7 @@ COPY --from=builder /out/manager .
COPY --from=builder /out/github-webhook-server .
COPY --from=builder /out/actions-metrics-server .
COPY --from=builder /out/github-runnerscaleset-listener .
COPY --from=builder /out/ghalistener .
COPY --from=builder /out/sleep .
USER 65532:65532

View File

@@ -15,13 +15,13 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.7.0
version: 0.8.0
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: "0.7.0"
appVersion: "0.8.0"
home: https://github.com/actions/actions-runner-controller

View File

@@ -15,13 +15,13 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.7.0
version: 0.8.0
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: "0.7.0"
appVersion: "0.8.0"
home: https://github.com/actions/actions-runner-controller

View File

@@ -385,6 +385,9 @@ volumeMounts:
{{- $setNodeExtraCaCerts = 1 }}
{{- $setRunnerUpdateCaCerts = 1 }}
{{- end }}
{{- $mountGitHubServerTLS := 0 }}
{{- if or $container.env $setNodeExtraCaCerts $setRunnerUpdateCaCerts }}
env:
{{- with $container.env }}
{{- range $i, $env := . }}
@@ -405,10 +408,12 @@ volumeMounts:
- name: RUNNER_UPDATE_CA_CERTS
value: "1"
{{- end }}
{{- $mountGitHubServerTLS := 0 }}
{{- if $tlsConfig.runnerMountPath }}
{{- $mountGitHubServerTLS = 1 }}
{{- end }}
{{- end }}
{{- if or $container.volumeMounts $mountGitHubServerTLS }}
volumeMounts:
{{- with $container.volumeMounts }}
{{- range $i, $volMount := . }}
@@ -423,6 +428,7 @@ volumeMounts:
mountPath: {{ clean (print $tlsConfig.runnerMountPath "/" $tlsConfig.certificateFrom.configMapKeyRef.key) }}
subPath: {{ $tlsConfig.certificateFrom.configMapKeyRef.key }}
{{- end }}
{{- end}}
{{- end }}
{{- end }}
{{- end }}

View File

@@ -2017,3 +2017,75 @@ func TestTemplateRenderedAutoscalingRunnerSetAnnotation_KubernetesModeCleanup(t
assert.Equal(t, value, autoscalingRunnerSet.Annotations[annotation], fmt.Sprintf("Annotation %q does not match the expected value", annotation))
}
}
func TestRunnerContainerEnvNotEmptyMap(t *testing.T) {
t.Parallel()
// Path to the helm chart we will test
helmChartPath, err := filepath.Abs("../../gha-runner-scale-set")
require.NoError(t, err)
testValuesPath, err := filepath.Abs("../tests/values.yaml")
require.NoError(t, err)
releaseName := "test-runners"
namespaceName := "test-" + strings.ToLower(random.UniqueId())
options := &helm.Options{
Logger: logger.Discard,
ValuesFiles: []string{testValuesPath},
KubectlOptions: k8s.NewKubectlOptions("", "", namespaceName),
}
output := helm.RenderTemplate(t, options, helmChartPath, releaseName, []string{"templates/autoscalingrunnerset.yaml"})
type testModel struct {
Spec struct {
Template struct {
Spec struct {
Containers []map[string]any `yaml:"containers"`
} `yaml:"spec"`
} `yaml:"template"`
} `yaml:"spec"`
}
var m testModel
helm.UnmarshalK8SYaml(t, output, &m)
_, ok := m.Spec.Template.Spec.Containers[0]["env"]
assert.False(t, ok, "env should not be set")
}
func TestRunnerContainerVolumeNotEmptyMap(t *testing.T) {
t.Parallel()
// Path to the helm chart we will test
helmChartPath, err := filepath.Abs("../../gha-runner-scale-set")
require.NoError(t, err)
testValuesPath, err := filepath.Abs("../tests/values.yaml")
require.NoError(t, err)
releaseName := "test-runners"
namespaceName := "test-" + strings.ToLower(random.UniqueId())
options := &helm.Options{
Logger: logger.Discard,
ValuesFiles: []string{testValuesPath},
KubectlOptions: k8s.NewKubectlOptions("", "", namespaceName),
}
output := helm.RenderTemplate(t, options, helmChartPath, releaseName, []string{"templates/autoscalingrunnerset.yaml"})
type testModel struct {
Spec struct {
Template struct {
Spec struct {
Containers []map[string]any `yaml:"containers"`
} `yaml:"spec"`
} `yaml:"template"`
} `yaml:"spec"`
}
var m testModel
helm.UnmarshalK8SYaml(t, output, &m)
_, ok := m.Spec.Template.Spec.Containers[0]["volumeMounts"]
assert.False(t, ok, "volumeMounts should not be set")
}

View File

@@ -39,7 +39,8 @@ githubConfigSecret:
## maxRunners is the max number of runners the autoscaling runner set will scale up to.
# maxRunners: 5
## minRunners is the min number of runners the autoscaling runner set will scale down to.
## minRunners is the min number of idle runners. The target number of runners created will be
## calculated as a sum of minRunners and the number of jobs assigned to the scale set.
# minRunners: 0
# runnerGroup: "default"

133
cmd/ghalistener/app/app.go Normal file
View File

@@ -0,0 +1,133 @@
package app
import (
"context"
"errors"
"fmt"
"github.com/actions/actions-runner-controller/cmd/ghalistener/config"
"github.com/actions/actions-runner-controller/cmd/ghalistener/listener"
"github.com/actions/actions-runner-controller/cmd/ghalistener/metrics"
"github.com/actions/actions-runner-controller/cmd/ghalistener/worker"
"github.com/actions/actions-runner-controller/github/actions"
"github.com/go-logr/logr"
"golang.org/x/sync/errgroup"
)
// App is responsible for initializing required components and running the app.
type App struct {
// configured fields
config config.Config
logger logr.Logger
// initialized fields
listener Listener
worker Worker
metrics metrics.ServerPublisher
}
//go:generate mockery --name Listener --output ./mocks --outpkg mocks --case underscore
type Listener interface {
Listen(ctx context.Context, handler listener.Handler) error
}
//go:generate mockery --name Worker --output ./mocks --outpkg mocks --case underscore
type Worker interface {
HandleJobStarted(ctx context.Context, jobInfo *actions.JobStarted) error
HandleDesiredRunnerCount(ctx context.Context, desiredRunnerCount int) error
}
func New(config config.Config) (*App, error) {
app := &App{
config: config,
}
ghConfig, err := actions.ParseGitHubConfigFromURL(config.ConfigureUrl)
if err != nil {
return nil, fmt.Errorf("failed to parse GitHub config from URL: %w", err)
}
{
logger, err := config.Logger()
if err != nil {
return nil, fmt.Errorf("failed to create logger: %w", err)
}
app.logger = logger.WithName("listener-app")
}
actionsClient, err := config.ActionsClient(app.logger)
if err != nil {
return nil, fmt.Errorf("failed to create actions client: %w", err)
}
if config.MetricsAddr != "" {
app.metrics = metrics.NewExporter(metrics.ExporterConfig{
ScaleSetName: config.EphemeralRunnerSetName,
ScaleSetNamespace: config.EphemeralRunnerSetNamespace,
Enterprise: ghConfig.Enterprise,
Organization: ghConfig.Organization,
Repository: ghConfig.Repository,
ServerAddr: config.MetricsAddr,
ServerEndpoint: config.MetricsEndpoint,
})
}
worker, err := worker.New(
worker.Config{
EphemeralRunnerSetNamespace: config.EphemeralRunnerSetNamespace,
EphemeralRunnerSetName: config.EphemeralRunnerSetName,
MaxRunners: config.MaxRunners,
MinRunners: config.MinRunners,
},
worker.WithLogger(app.logger.WithName("worker")),
)
if err != nil {
return nil, fmt.Errorf("failed to create new kubernetes worker: %w", err)
}
app.worker = worker
listener, err := listener.New(listener.Config{
Client: actionsClient,
ScaleSetID: app.config.RunnerScaleSetId,
MinRunners: app.config.MinRunners,
MaxRunners: app.config.MaxRunners,
Logger: app.logger.WithName("listener"),
Metrics: app.metrics,
})
if err != nil {
return nil, fmt.Errorf("failed to create new listener: %w", err)
}
app.listener = listener
app.logger.Info("app initialized")
return app, nil
}
func (app *App) Run(ctx context.Context) error {
var errs []error
if app.worker == nil {
errs = append(errs, fmt.Errorf("worker not initialized"))
}
if app.listener == nil {
errs = append(errs, fmt.Errorf("listener not initialized"))
}
if err := errors.Join(errs...); err != nil {
return fmt.Errorf("app not initialized: %w", err)
}
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
app.logger.Info("Starting listener")
return app.listener.Listen(ctx, app.worker)
})
if app.metrics != nil {
g.Go(func() error {
app.logger.Info("Starting metrics server")
return app.metrics.ListenAndServe(ctx)
})
}
return g.Wait()
}

View File

@@ -0,0 +1,85 @@
package app
import (
"context"
"errors"
"testing"
appmocks "github.com/actions/actions-runner-controller/cmd/ghalistener/app/mocks"
"github.com/actions/actions-runner-controller/cmd/ghalistener/listener"
metricsMocks "github.com/actions/actions-runner-controller/cmd/ghalistener/metrics/mocks"
"github.com/actions/actions-runner-controller/cmd/ghalistener/worker"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
func TestApp_Run(t *testing.T) {
t.Parallel()
t.Run("ListenerWorkerGuard", func(t *testing.T) {
invalidApps := []*App{
{},
{worker: &worker.Worker{}},
{listener: &listener.Listener{}},
}
for _, app := range invalidApps {
assert.Error(t, app.Run(context.Background()))
}
})
t.Run("ExitsOnListenerError", func(t *testing.T) {
listener := appmocks.NewListener(t)
worker := appmocks.NewWorker(t)
listener.On("Listen", mock.Anything, mock.Anything).Return(errors.New("listener error")).Once()
app := &App{
listener: listener,
worker: worker,
}
err := app.Run(context.Background())
assert.Error(t, err)
})
t.Run("ExitsOnListenerNil", func(t *testing.T) {
listener := appmocks.NewListener(t)
worker := appmocks.NewWorker(t)
listener.On("Listen", mock.Anything, mock.Anything).Return(nil).Once()
app := &App{
listener: listener,
worker: worker,
}
err := app.Run(context.Background())
assert.NoError(t, err)
})
t.Run("CancelListenerOnMetricsServerError", func(t *testing.T) {
listener := appmocks.NewListener(t)
worker := appmocks.NewWorker(t)
metrics := metricsMocks.NewServerPublisher(t)
ctx := context.Background()
listener.On("Listen", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
ctx := args.Get(0).(context.Context)
go func() {
<-ctx.Done()
}()
}).Return(nil).Once()
metrics.On("ListenAndServe", mock.Anything).Return(errors.New("metrics server error")).Once()
app := &App{
listener: listener,
worker: worker,
metrics: metrics,
}
err := app.Run(ctx)
assert.Error(t, err)
})
}

View File

@@ -0,0 +1,43 @@
// Code generated by mockery v2.36.1. DO NOT EDIT.
package mocks
import (
context "context"
listener "github.com/actions/actions-runner-controller/cmd/ghalistener/listener"
mock "github.com/stretchr/testify/mock"
)
// Listener is an autogenerated mock type for the Listener type
type Listener struct {
mock.Mock
}
// Listen provides a mock function with given fields: ctx, handler
func (_m *Listener) Listen(ctx context.Context, handler listener.Handler) error {
ret := _m.Called(ctx, handler)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, listener.Handler) error); ok {
r0 = rf(ctx, handler)
} else {
r0 = ret.Error(0)
}
return r0
}
// NewListener creates a new instance of Listener. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewListener(t interface {
mock.TestingT
Cleanup(func())
}) *Listener {
mock := &Listener{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@@ -0,0 +1,58 @@
// Code generated by mockery v2.36.1. DO NOT EDIT.
package mocks
import (
actions "github.com/actions/actions-runner-controller/github/actions"
context "context"
mock "github.com/stretchr/testify/mock"
)
// Worker is an autogenerated mock type for the Worker type
type Worker struct {
mock.Mock
}
// HandleDesiredRunnerCount provides a mock function with given fields: ctx, desiredRunnerCount
func (_m *Worker) HandleDesiredRunnerCount(ctx context.Context, desiredRunnerCount int) error {
ret := _m.Called(ctx, desiredRunnerCount)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, int) error); ok {
r0 = rf(ctx, desiredRunnerCount)
} else {
r0 = ret.Error(0)
}
return r0
}
// HandleJobStarted provides a mock function with given fields: ctx, jobInfo
func (_m *Worker) HandleJobStarted(ctx context.Context, jobInfo *actions.JobStarted) error {
ret := _m.Called(ctx, jobInfo)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, *actions.JobStarted) error); ok {
r0 = rf(ctx, jobInfo)
} else {
r0 = ret.Error(0)
}
return r0
}
// NewWorker creates a new instance of Worker. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewWorker(t interface {
mock.TestingT
Cleanup(func())
}) *Worker {
mock := &Worker{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@@ -0,0 +1,154 @@
package config
import (
"crypto/x509"
"encoding/json"
"fmt"
"os"
"github.com/actions/actions-runner-controller/build"
"github.com/actions/actions-runner-controller/github/actions"
"github.com/actions/actions-runner-controller/logging"
"github.com/go-logr/logr"
"golang.org/x/net/http/httpproxy"
)
type Config struct {
ConfigureUrl string `json:"configureUrl"`
AppID int64 `json:"appID"`
AppInstallationID int64 `json:"appInstallationID"`
AppPrivateKey string `json:"appPrivateKey"`
Token string `json:"token"`
EphemeralRunnerSetNamespace string `json:"ephemeralRunnerSetNamespace"`
EphemeralRunnerSetName string `json:"ephemeralRunnerSetName"`
MaxRunners int `json:"maxRunners"`
MinRunners int `json:"minRunners"`
RunnerScaleSetId int `json:"runnerScaleSetId"`
RunnerScaleSetName string `json:"runnerScaleSetName"`
ServerRootCA string `json:"serverRootCA"`
LogLevel string `json:"logLevel"`
LogFormat string `json:"logFormat"`
MetricsAddr string `json:"metricsAddr"`
MetricsEndpoint string `json:"metricsEndpoint"`
}
func Read(path string) (Config, error) {
f, err := os.Open(path)
if err != nil {
return Config{}, err
}
defer f.Close()
var config Config
if err := json.NewDecoder(f).Decode(&config); err != nil {
return Config{}, fmt.Errorf("failed to decode config: %w", err)
}
if err := config.validate(); err != nil {
return Config{}, fmt.Errorf("failed to validate config: %w", err)
}
return config, nil
}
func (c *Config) validate() error {
if len(c.ConfigureUrl) == 0 {
return fmt.Errorf("GitHubConfigUrl is not provided")
}
if len(c.EphemeralRunnerSetNamespace) == 0 || len(c.EphemeralRunnerSetName) == 0 {
return fmt.Errorf("EphemeralRunnerSetNamespace '%s' or EphemeralRunnerSetName '%s' is missing", c.EphemeralRunnerSetNamespace, c.EphemeralRunnerSetName)
}
if c.RunnerScaleSetId == 0 {
return fmt.Errorf("RunnerScaleSetId '%d' is missing", c.RunnerScaleSetId)
}
if c.MaxRunners < c.MinRunners {
return fmt.Errorf("MinRunners '%d' cannot be greater than MaxRunners '%d'", c.MinRunners, c.MaxRunners)
}
hasToken := len(c.Token) > 0
hasPrivateKeyConfig := c.AppID > 0 && c.AppPrivateKey != ""
if !hasToken && !hasPrivateKeyConfig {
return fmt.Errorf("GitHub auth credential is missing, token length: '%d', appId: '%d', installationId: '%d', private key length: '%d", len(c.Token), c.AppID, c.AppInstallationID, len(c.AppPrivateKey))
}
if hasToken && hasPrivateKeyConfig {
return fmt.Errorf("only one GitHub auth method supported at a time. Have both PAT and App auth: token length: '%d', appId: '%d', installationId: '%d', private key length: '%d", len(c.Token), c.AppID, c.AppInstallationID, len(c.AppPrivateKey))
}
return nil
}
func (c *Config) Logger() (logr.Logger, error) {
logLevel := string(logging.LogLevelDebug)
if c.LogLevel != "" {
logLevel = c.LogLevel
}
logFormat := string(logging.LogFormatText)
if c.LogFormat != "" {
logFormat = c.LogFormat
}
logger, err := logging.NewLogger(logLevel, logFormat)
if err != nil {
return logr.Logger{}, fmt.Errorf("NewLogger failed: %w", err)
}
return logger, nil
}
func (c *Config) ActionsClient(logger logr.Logger) (*actions.Client, error) {
var creds actions.ActionsAuth
switch c.Token {
case "":
creds.AppCreds = &actions.GitHubAppAuth{
AppID: c.AppID,
AppInstallationID: c.AppInstallationID,
AppPrivateKey: c.AppPrivateKey,
}
default:
creds.Token = c.Token
}
options := []actions.ClientOption{
actions.WithLogger(logger),
}
if c.ServerRootCA != "" {
systemPool, err := x509.SystemCertPool()
if err != nil {
return nil, fmt.Errorf("failed to load system cert pool: %w", err)
}
pool := systemPool.Clone()
ok := pool.AppendCertsFromPEM([]byte(c.ServerRootCA))
if !ok {
return nil, fmt.Errorf("failed to parse root certificate")
}
options = append(options, actions.WithRootCAs(pool))
}
client, err := actions.NewClient(c.ConfigureUrl, &creds, options...)
if err != nil {
return nil, fmt.Errorf("failed to create actions client: %w", err)
}
client.SetUserAgent(actions.UserAgentInfo{
Version: build.Version,
CommitSHA: build.CommitSHA,
ScaleSetID: c.RunnerScaleSetId,
HasProxy: hasProxy(),
Subsystem: "ghalistener",
})
return client, nil
}
func hasProxy() bool {
proxyFunc := httpproxy.FromEnvironment().ProxyFunc()
return proxyFunc != nil
}

View File

@@ -0,0 +1,92 @@
package config
import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
)
func TestConfigValidationMinMax(t *testing.T) {
config := &Config{
ConfigureUrl: "github.com/some_org/some_repo",
EphemeralRunnerSetNamespace: "namespace",
EphemeralRunnerSetName: "deployment",
RunnerScaleSetId: 1,
MinRunners: 5,
MaxRunners: 2,
Token: "token",
}
err := config.validate()
assert.ErrorContains(t, err, "MinRunners '5' cannot be greater than MaxRunners '2", "Expected error about MinRunners > MaxRunners")
}
func TestConfigValidationMissingToken(t *testing.T) {
config := &Config{
ConfigureUrl: "github.com/some_org/some_repo",
EphemeralRunnerSetNamespace: "namespace",
EphemeralRunnerSetName: "deployment",
RunnerScaleSetId: 1,
}
err := config.validate()
expectedError := fmt.Sprintf("GitHub auth credential is missing, token length: '%d', appId: '%d', installationId: '%d', private key length: '%d", len(config.Token), config.AppID, config.AppInstallationID, len(config.AppPrivateKey))
assert.ErrorContains(t, err, expectedError, "Expected error about missing auth")
}
func TestConfigValidationAppKey(t *testing.T) {
config := &Config{
AppID: 1,
AppInstallationID: 10,
ConfigureUrl: "github.com/some_org/some_repo",
EphemeralRunnerSetNamespace: "namespace",
EphemeralRunnerSetName: "deployment",
RunnerScaleSetId: 1,
}
err := config.validate()
expectedError := fmt.Sprintf("GitHub auth credential is missing, token length: '%d', appId: '%d', installationId: '%d', private key length: '%d", len(config.Token), config.AppID, config.AppInstallationID, len(config.AppPrivateKey))
assert.ErrorContains(t, err, expectedError, "Expected error about missing auth")
}
func TestConfigValidationOnlyOneTypeOfCredentials(t *testing.T) {
config := &Config{
AppID: 1,
AppInstallationID: 10,
AppPrivateKey: "asdf",
Token: "asdf",
ConfigureUrl: "github.com/some_org/some_repo",
EphemeralRunnerSetNamespace: "namespace",
EphemeralRunnerSetName: "deployment",
RunnerScaleSetId: 1,
}
err := config.validate()
expectedError := fmt.Sprintf("only one GitHub auth method supported at a time. Have both PAT and App auth: token length: '%d', appId: '%d', installationId: '%d', private key length: '%d", len(config.Token), config.AppID, config.AppInstallationID, len(config.AppPrivateKey))
assert.ErrorContains(t, err, expectedError, "Expected error about missing auth")
}
func TestConfigValidation(t *testing.T) {
config := &Config{
ConfigureUrl: "https://github.com/actions",
EphemeralRunnerSetNamespace: "namespace",
EphemeralRunnerSetName: "deployment",
RunnerScaleSetId: 1,
MinRunners: 1,
MaxRunners: 5,
Token: "asdf",
}
err := config.validate()
assert.NoError(t, err, "Expected no error")
}
func TestConfigValidationConfigUrl(t *testing.T) {
config := &Config{
EphemeralRunnerSetNamespace: "namespace",
EphemeralRunnerSetName: "deployment",
RunnerScaleSetId: 1,
}
err := config.validate()
assert.ErrorContains(t, err, "GitHubConfigUrl is not provided", "Expected error about missing ConfigureUrl")
}

View File

@@ -0,0 +1,388 @@
package listener
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"os"
"time"
"github.com/actions/actions-runner-controller/cmd/ghalistener/metrics"
"github.com/actions/actions-runner-controller/github/actions"
"github.com/go-logr/logr"
"github.com/google/uuid"
)
const (
sessionCreationMaxRetries = 10
)
// message types
const (
messageTypeJobAvailable = "JobAvailable"
messageTypeJobAssigned = "JobAssigned"
messageTypeJobStarted = "JobStarted"
messageTypeJobCompleted = "JobCompleted"
)
//go:generate mockery --name Client --output ./mocks --outpkg mocks --case underscore
type Client interface {
GetAcquirableJobs(ctx context.Context, runnerScaleSetId int) (*actions.AcquirableJobList, error)
CreateMessageSession(ctx context.Context, runnerScaleSetId int, owner string) (*actions.RunnerScaleSetSession, error)
GetMessage(ctx context.Context, messageQueueUrl, messageQueueAccessToken string, lastMessageId int64) (*actions.RunnerScaleSetMessage, error)
DeleteMessage(ctx context.Context, messageQueueUrl, messageQueueAccessToken string, messageId int64) error
AcquireJobs(ctx context.Context, runnerScaleSetId int, messageQueueAccessToken string, requestIds []int64) ([]int64, error)
RefreshMessageSession(ctx context.Context, runnerScaleSetId int, sessionId *uuid.UUID) (*actions.RunnerScaleSetSession, error)
}
type Config struct {
Client Client
ScaleSetID int
MinRunners int
MaxRunners int
Logger logr.Logger
Metrics metrics.Publisher
}
func (c *Config) Validate() error {
if c.Client == nil {
return errors.New("client is required")
}
if c.ScaleSetID == 0 {
return errors.New("scaleSetID is required")
}
if c.MinRunners < 0 {
return errors.New("minRunners must be greater than or equal to 0")
}
if c.MaxRunners < 0 {
return errors.New("maxRunners must be greater than or equal to 0")
}
if c.MaxRunners > 0 && c.MinRunners > c.MaxRunners {
return errors.New("minRunners must be less than or equal to maxRunners")
}
return nil
}
// The Listener's role is to manage all interactions with the actions service.
// It receives messages and processes them using the given handler.
type Listener struct {
// configured fields
scaleSetID int // The ID of the scale set associated with the listener.
client Client // The client used to interact with the scale set.
metrics metrics.Publisher // The publisher used to publish metrics.
// internal fields
logger logr.Logger // The logger used for logging.
hostname string // The hostname of the listener.
// updated fields
lastMessageID int64 // The ID of the last processed message.
session *actions.RunnerScaleSetSession // The session for managing the runner scale set.
}
func New(config Config) (*Listener, error) {
if err := config.Validate(); err != nil {
return nil, fmt.Errorf("invalid config: %w", err)
}
listener := &Listener{
scaleSetID: config.ScaleSetID,
client: config.Client,
logger: config.Logger,
metrics: metrics.Discard,
}
if config.Metrics != nil {
listener.metrics = config.Metrics
}
listener.metrics.PublishStatic(config.MinRunners, config.MaxRunners)
hostname, err := os.Hostname()
if err != nil {
hostname = uuid.NewString()
listener.logger.Info("Failed to get hostname, fallback to uuid", "uuid", hostname, "error", err)
}
listener.hostname = hostname
return listener, nil
}
//go:generate mockery --name Handler --output ./mocks --outpkg mocks --case underscore
type Handler interface {
HandleJobStarted(ctx context.Context, jobInfo *actions.JobStarted) error
HandleDesiredRunnerCount(ctx context.Context, desiredRunnerCount int) error
}
// Listen listens for incoming messages and handles them using the provided handler.
// It continuously listens for messages until the context is cancelled.
// The initial message contains the current statistics and acquirable jobs, if any.
// The handler is responsible for handling the initial message and subsequent messages.
// If an error occurs during any step, Listen returns an error.
func (l *Listener) Listen(ctx context.Context, handler Handler) error {
if err := l.createSession(ctx); err != nil {
return fmt.Errorf("createSession failed: %w", err)
}
initialMessage := &actions.RunnerScaleSetMessage{
MessageId: 0,
MessageType: "RunnerScaleSetJobMessages",
Statistics: l.session.Statistics,
Body: "",
}
if l.session.Statistics.TotalAvailableJobs > 0 || l.session.Statistics.TotalAssignedJobs > 0 {
acquirableJobs, err := l.client.GetAcquirableJobs(ctx, l.scaleSetID)
if err != nil {
return fmt.Errorf("failed to call GetAcquirableJobs: %w", err)
}
acquirableJobsJson, err := json.Marshal(acquirableJobs)
if err != nil {
return fmt.Errorf("failed to marshal acquirable jobs: %w", err)
}
initialMessage.Body = string(acquirableJobsJson)
}
if err := handler.HandleDesiredRunnerCount(ctx, initialMessage.Statistics.TotalAssignedJobs); err != nil {
return fmt.Errorf("handling initial message failed: %w", err)
}
for {
select {
case <-ctx.Done():
return fmt.Errorf("context cancelled: %w", ctx.Err())
default:
}
msg, err := l.getMessage(ctx)
if err != nil {
return fmt.Errorf("failed to get message: %w", err)
}
if msg == nil {
continue
}
statistics, jobsStarted, err := l.parseMessage(ctx, msg)
if err != nil {
return fmt.Errorf("failed to parse message: %w", err)
}
l.lastMessageID = msg.MessageId
if err := l.deleteLastMessage(ctx); err != nil {
return fmt.Errorf("failed to delete message: %w", err)
}
for _, jobStarted := range jobsStarted {
if err := handler.HandleJobStarted(ctx, jobStarted); err != nil {
return fmt.Errorf("failed to handle job started: %w", err)
}
}
if err := handler.HandleDesiredRunnerCount(ctx, statistics.TotalAssignedJobs); err != nil {
return fmt.Errorf("failed to handle desired runner count: %w", err)
}
}
}
func (l *Listener) createSession(ctx context.Context) error {
var session *actions.RunnerScaleSetSession
var retries int
for {
var err error
session, err = l.client.CreateMessageSession(ctx, l.scaleSetID, l.hostname)
if err == nil {
break
}
clientErr := &actions.HttpClientSideError{}
if !errors.As(err, &clientErr) {
return fmt.Errorf("failed to create session: %w", err)
}
if clientErr.Code != http.StatusConflict {
return fmt.Errorf("failed to create session: %w", err)
}
retries++
if retries >= sessionCreationMaxRetries {
return fmt.Errorf("failed to create session after %d retries: %w", retries, err)
}
l.logger.Info("Unable to create message session. Will try again in 30 seconds", "error", err.Error())
select {
case <-ctx.Done():
return fmt.Errorf("context cancelled: %w", ctx.Err())
case <-time.After(30 * time.Second):
}
}
statistics, err := json.Marshal(session.Statistics)
if err != nil {
return fmt.Errorf("failed to marshal statistics: %w", err)
}
l.logger.Info("Current runner scale set statistics.", "statistics", string(statistics))
l.session = session
return nil
}
func (l *Listener) getMessage(ctx context.Context) (*actions.RunnerScaleSetMessage, error) {
l.logger.Info("Getting next message", "lastMessageID", l.lastMessageID)
msg, err := l.client.GetMessage(ctx, l.session.MessageQueueUrl, l.session.MessageQueueAccessToken, l.lastMessageID)
if err == nil { // if NO error
return msg, nil
}
expiredError := &actions.MessageQueueTokenExpiredError{}
if !errors.As(err, &expiredError) {
return nil, fmt.Errorf("failed to get next message: %w", err)
}
if err := l.refreshSession(ctx); err != nil {
return nil, err
}
l.logger.Info("Getting next message", "lastMessageID", l.lastMessageID)
msg, err = l.client.GetMessage(ctx, l.session.MessageQueueUrl, l.session.MessageQueueAccessToken, l.lastMessageID)
if err != nil { // if NO error
return nil, fmt.Errorf("failed to get next message after message session refresh: %w", err)
}
return msg, nil
}
func (l *Listener) deleteLastMessage(ctx context.Context) error {
l.logger.Info("Deleting last message", "lastMessageID", l.lastMessageID)
if err := l.client.DeleteMessage(ctx, l.session.MessageQueueUrl, l.session.MessageQueueAccessToken, l.lastMessageID); err != nil {
return fmt.Errorf("failed to delete message: %w", err)
}
return nil
}
func (l *Listener) parseMessage(ctx context.Context, msg *actions.RunnerScaleSetMessage) (*actions.RunnerScaleSetStatistic, []*actions.JobStarted, error) {
l.logger.Info("Processing message", "messageId", msg.MessageId, "messageType", msg.MessageType)
if msg.Statistics == nil {
return nil, nil, fmt.Errorf("invalid message: statistics is nil")
}
l.logger.Info("New runner scale set statistics.", "statistics", msg.Statistics)
if msg.MessageType != "RunnerScaleSetJobMessages" {
l.logger.Info("Skipping message", "messageType", msg.MessageType)
return nil, nil, fmt.Errorf("invalid message type: %s", msg.MessageType)
}
var batchedMessages []json.RawMessage
if len(msg.Body) > 0 {
if err := json.Unmarshal([]byte(msg.Body), &batchedMessages); err != nil {
return nil, nil, fmt.Errorf("failed to unmarshal batched messages: %w", err)
}
}
var availableJobs []int64
var startedJobs []*actions.JobStarted
for _, msg := range batchedMessages {
var messageType actions.JobMessageType
if err := json.Unmarshal(msg, &messageType); err != nil {
return nil, nil, fmt.Errorf("failed to decode job message type: %w", err)
}
switch messageType.MessageType {
case messageTypeJobAvailable:
var jobAvailable actions.JobAvailable
if err := json.Unmarshal(msg, &jobAvailable); err != nil {
return nil, nil, fmt.Errorf("failed to decode job available: %w", err)
}
l.logger.Info("Job available message received", "jobId", jobAvailable.RunnerRequestId)
availableJobs = append(availableJobs, jobAvailable.RunnerRequestId)
case messageTypeJobAssigned:
var jobAssigned actions.JobAssigned
if err := json.Unmarshal(msg, &jobAssigned); err != nil {
return nil, nil, fmt.Errorf("failed to decode job assigned: %w", err)
}
l.logger.Info("Job assigned message received", "jobId", jobAssigned.RunnerRequestId)
case messageTypeJobStarted:
var jobStarted actions.JobStarted
if err := json.Unmarshal(msg, &jobStarted); err != nil {
return nil, nil, fmt.Errorf("could not decode job started message. %w", err)
}
l.logger.Info("Job started message received.", "RequestId", jobStarted.RunnerRequestId, "RunnerId", jobStarted.RunnerId)
startedJobs = append(startedJobs, &jobStarted)
case messageTypeJobCompleted:
var jobCompleted actions.JobCompleted
if err := json.Unmarshal(msg, &jobCompleted); err != nil {
return nil, nil, fmt.Errorf("failed to decode job completed: %w", err)
}
l.logger.Info("Job completed message received.", "RequestId", jobCompleted.RunnerRequestId, "Result", jobCompleted.Result, "RunnerId", jobCompleted.RunnerId, "RunnerName", jobCompleted.RunnerName)
default:
l.logger.Info("unknown job message type.", "messageType", messageType.MessageType)
}
}
l.logger.Info("Available jobs.", "count", len(availableJobs), "requestIds", fmt.Sprint(availableJobs))
if len(availableJobs) > 0 {
acquired, err := l.acquireAvailableJobs(ctx, availableJobs)
if err != nil {
return nil, nil, err
}
l.logger.Info("Jobs are acquired", "count", len(acquired), "requestIds", fmt.Sprint(acquired))
}
return msg.Statistics, startedJobs, nil
}
func (l *Listener) acquireAvailableJobs(ctx context.Context, availableJobs []int64) ([]int64, error) {
l.logger.Info("Acquiring jobs")
ids, err := l.client.AcquireJobs(ctx, l.scaleSetID, l.session.MessageQueueAccessToken, availableJobs)
if err == nil { // if NO errors
return ids, nil
}
expiredError := &actions.MessageQueueTokenExpiredError{}
if !errors.As(err, &expiredError) {
return nil, fmt.Errorf("failed to acquire jobs: %w", err)
}
if err := l.refreshSession(ctx); err != nil {
return nil, err
}
ids, err = l.client.AcquireJobs(ctx, l.scaleSetID, l.session.MessageQueueAccessToken, availableJobs)
if err != nil {
return nil, fmt.Errorf("failed to acquire jobs after session refresh: %w", err)
}
return ids, nil
}
func (l *Listener) refreshSession(ctx context.Context) error {
l.logger.Info("Message queue token is expired during GetNextMessage, refreshing...")
session, err := l.client.RefreshMessageSession(ctx, l.session.RunnerScaleSet.Id, l.session.SessionId)
if err != nil {
return fmt.Errorf("refresh message session failed. %w", err)
}
l.session = session
return nil
}

View File

@@ -0,0 +1,613 @@
package listener
import (
"context"
"errors"
"net/http"
"testing"
"time"
listenermocks "github.com/actions/actions-runner-controller/cmd/ghalistener/listener/mocks"
"github.com/actions/actions-runner-controller/cmd/ghalistener/metrics"
metricsmocks "github.com/actions/actions-runner-controller/cmd/ghalistener/metrics/mocks"
"github.com/actions/actions-runner-controller/github/actions"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
func TestNew(t *testing.T) {
t.Parallel()
t.Run("InvalidConfig", func(t *testing.T) {
t.Parallel()
var config Config
_, err := New(config)
assert.NotNil(t, err)
})
t.Run("ValidConfig", func(t *testing.T) {
t.Parallel()
config := Config{
Client: listenermocks.NewClient(t),
ScaleSetID: 1,
Metrics: metrics.Discard,
}
l, err := New(config)
assert.Nil(t, err)
assert.NotNil(t, l)
})
t.Run("SetStaticMetrics", func(t *testing.T) {
t.Parallel()
metrics := metricsmocks.NewPublisher(t)
metrics.On("PublishStatic", mock.Anything, mock.Anything).Once()
config := Config{
Client: listenermocks.NewClient(t),
ScaleSetID: 1,
Metrics: metrics,
}
l, err := New(config)
assert.Nil(t, err)
assert.NotNil(t, l)
})
}
func TestListener_createSession(t *testing.T) {
t.Parallel()
t.Run("FailOnce", func(t *testing.T) {
t.Parallel()
ctx := context.Background()
config := Config{
ScaleSetID: 1,
Metrics: metrics.Discard,
}
client := listenermocks.NewClient(t)
client.On("CreateMessageSession", ctx, mock.Anything, mock.Anything).Return(nil, assert.AnError).Once()
config.Client = client
l, err := New(config)
require.Nil(t, err)
err = l.createSession(ctx)
assert.NotNil(t, err)
})
t.Run("FailContext", func(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
config := Config{
ScaleSetID: 1,
Metrics: metrics.Discard,
}
client := listenermocks.NewClient(t)
client.On("CreateMessageSession", ctx, mock.Anything, mock.Anything).Return(nil,
&actions.HttpClientSideError{Code: http.StatusConflict}).Once()
config.Client = client
l, err := New(config)
require.Nil(t, err)
err = l.createSession(ctx)
assert.True(t, errors.Is(err, context.DeadlineExceeded))
})
t.Run("SetsSession", func(t *testing.T) {
t.Parallel()
config := Config{
ScaleSetID: 1,
Metrics: metrics.Discard,
}
client := listenermocks.NewClient(t)
uuid := uuid.New()
session := &actions.RunnerScaleSetSession{
SessionId: &uuid,
OwnerName: "example",
RunnerScaleSet: &actions.RunnerScaleSet{},
MessageQueueUrl: "https://example.com",
MessageQueueAccessToken: "1234567890",
Statistics: nil,
}
client.On("CreateMessageSession", mock.Anything, mock.Anything, mock.Anything).Return(session, nil).Once()
config.Client = client
l, err := New(config)
require.Nil(t, err)
err = l.createSession(context.Background())
assert.Nil(t, err)
assert.Equal(t, session, l.session)
})
}
func TestListener_getMessage(t *testing.T) {
t.Parallel()
t.Run("ReceivesMessage", func(t *testing.T) {
t.Parallel()
ctx := context.Background()
config := Config{
ScaleSetID: 1,
Metrics: metrics.Discard,
}
client := listenermocks.NewClient(t)
want := &actions.RunnerScaleSetMessage{
MessageId: 1,
}
client.On("GetMessage", ctx, mock.Anything, mock.Anything, mock.Anything).Return(want, nil).Once()
config.Client = client
l, err := New(config)
require.Nil(t, err)
l.session = &actions.RunnerScaleSetSession{}
got, err := l.getMessage(ctx)
assert.Nil(t, err)
assert.Equal(t, want, got)
})
t.Run("NotExpiredError", func(t *testing.T) {
t.Parallel()
ctx := context.Background()
config := Config{
ScaleSetID: 1,
Metrics: metrics.Discard,
}
client := listenermocks.NewClient(t)
client.On("GetMessage", ctx, mock.Anything, mock.Anything, mock.Anything).Return(nil, &actions.HttpClientSideError{Code: http.StatusNotFound}).Once()
config.Client = client
l, err := New(config)
require.Nil(t, err)
l.session = &actions.RunnerScaleSetSession{}
_, err = l.getMessage(ctx)
assert.NotNil(t, err)
})
t.Run("RefreshAndSucceeds", func(t *testing.T) {
t.Parallel()
ctx := context.Background()
config := Config{
ScaleSetID: 1,
Metrics: metrics.Discard,
}
client := listenermocks.NewClient(t)
uuid := uuid.New()
session := &actions.RunnerScaleSetSession{
SessionId: &uuid,
OwnerName: "example",
RunnerScaleSet: &actions.RunnerScaleSet{},
MessageQueueUrl: "https://example.com",
MessageQueueAccessToken: "1234567890",
Statistics: nil,
}
client.On("RefreshMessageSession", ctx, mock.Anything, mock.Anything).Return(session, nil).Once()
client.On("GetMessage", ctx, mock.Anything, mock.Anything, mock.Anything).Return(nil, &actions.MessageQueueTokenExpiredError{}).Once()
want := &actions.RunnerScaleSetMessage{
MessageId: 1,
}
client.On("GetMessage", ctx, mock.Anything, mock.Anything, mock.Anything).Return(want, nil).Once()
config.Client = client
l, err := New(config)
require.Nil(t, err)
l.session = &actions.RunnerScaleSetSession{
SessionId: &uuid,
RunnerScaleSet: &actions.RunnerScaleSet{},
}
got, err := l.getMessage(ctx)
assert.Nil(t, err)
assert.Equal(t, want, got)
})
t.Run("RefreshAndFails", func(t *testing.T) {
t.Parallel()
ctx := context.Background()
config := Config{
ScaleSetID: 1,
Metrics: metrics.Discard,
}
client := listenermocks.NewClient(t)
uuid := uuid.New()
session := &actions.RunnerScaleSetSession{
SessionId: &uuid,
OwnerName: "example",
RunnerScaleSet: &actions.RunnerScaleSet{},
MessageQueueUrl: "https://example.com",
MessageQueueAccessToken: "1234567890",
Statistics: nil,
}
client.On("RefreshMessageSession", ctx, mock.Anything, mock.Anything).Return(session, nil).Once()
client.On("GetMessage", ctx, mock.Anything, mock.Anything, mock.Anything).Return(nil, &actions.MessageQueueTokenExpiredError{}).Twice()
config.Client = client
l, err := New(config)
require.Nil(t, err)
l.session = &actions.RunnerScaleSetSession{
SessionId: &uuid,
RunnerScaleSet: &actions.RunnerScaleSet{},
}
got, err := l.getMessage(ctx)
assert.NotNil(t, err)
assert.Nil(t, got)
})
}
func TestListener_refreshSession(t *testing.T) {
t.Parallel()
t.Run("SuccessfullyRefreshes", func(t *testing.T) {
t.Parallel()
ctx := context.Background()
config := Config{
ScaleSetID: 1,
Metrics: metrics.Discard,
}
client := listenermocks.NewClient(t)
newUUID := uuid.New()
session := &actions.RunnerScaleSetSession{
SessionId: &newUUID,
OwnerName: "example",
RunnerScaleSet: &actions.RunnerScaleSet{},
MessageQueueUrl: "https://example.com",
MessageQueueAccessToken: "1234567890",
Statistics: nil,
}
client.On("RefreshMessageSession", ctx, mock.Anything, mock.Anything).Return(session, nil).Once()
config.Client = client
l, err := New(config)
require.Nil(t, err)
oldUUID := uuid.New()
l.session = &actions.RunnerScaleSetSession{
SessionId: &oldUUID,
RunnerScaleSet: &actions.RunnerScaleSet{},
}
err = l.refreshSession(ctx)
assert.Nil(t, err)
assert.Equal(t, session, l.session)
})
t.Run("FailsToRefresh", func(t *testing.T) {
t.Parallel()
ctx := context.Background()
config := Config{
ScaleSetID: 1,
Metrics: metrics.Discard,
}
client := listenermocks.NewClient(t)
client.On("RefreshMessageSession", ctx, mock.Anything, mock.Anything).Return(nil, errors.New("error")).Once()
config.Client = client
l, err := New(config)
require.Nil(t, err)
oldUUID := uuid.New()
oldSession := &actions.RunnerScaleSetSession{
SessionId: &oldUUID,
RunnerScaleSet: &actions.RunnerScaleSet{},
}
l.session = oldSession
err = l.refreshSession(ctx)
assert.NotNil(t, err)
assert.Equal(t, oldSession, l.session)
})
}
func TestListener_deleteLastMessage(t *testing.T) {
t.Parallel()
t.Run("SuccessfullyDeletes", func(t *testing.T) {
t.Parallel()
ctx := context.Background()
config := Config{
ScaleSetID: 1,
Metrics: metrics.Discard,
}
client := listenermocks.NewClient(t)
client.On("DeleteMessage", ctx, mock.Anything, mock.Anything, mock.MatchedBy(func(lastMessageID any) bool {
return lastMessageID.(int64) == int64(5)
})).Return(nil).Once()
config.Client = client
l, err := New(config)
require.Nil(t, err)
l.session = &actions.RunnerScaleSetSession{}
l.lastMessageID = 5
err = l.deleteLastMessage(ctx)
assert.Nil(t, err)
})
t.Run("FailsToDelete", func(t *testing.T) {
t.Parallel()
ctx := context.Background()
config := Config{
ScaleSetID: 1,
Metrics: metrics.Discard,
}
client := listenermocks.NewClient(t)
client.On("DeleteMessage", ctx, mock.Anything, mock.Anything, mock.Anything).Return(errors.New("error")).Once()
config.Client = client
l, err := New(config)
require.Nil(t, err)
l.session = &actions.RunnerScaleSetSession{}
l.lastMessageID = 5
err = l.deleteLastMessage(ctx)
assert.NotNil(t, err)
})
}
func TestListener_Listen(t *testing.T) {
t.Parallel()
t.Run("CreateSessionFails", func(t *testing.T) {
t.Parallel()
ctx := context.Background()
config := Config{
ScaleSetID: 1,
Metrics: metrics.Discard,
}
client := listenermocks.NewClient(t)
client.On("CreateMessageSession", ctx, mock.Anything, mock.Anything).Return(nil, assert.AnError).Once()
config.Client = client
l, err := New(config)
require.Nil(t, err)
err = l.Listen(ctx, nil)
assert.NotNil(t, err)
})
t.Run("CallHandleRegardlessOfInitialMessage", func(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
config := Config{
ScaleSetID: 1,
Metrics: metrics.Discard,
}
client := listenermocks.NewClient(t)
uuid := uuid.New()
session := &actions.RunnerScaleSetSession{
SessionId: &uuid,
OwnerName: "example",
RunnerScaleSet: &actions.RunnerScaleSet{},
MessageQueueUrl: "https://example.com",
MessageQueueAccessToken: "1234567890",
Statistics: &actions.RunnerScaleSetStatistic{},
}
client.On("CreateMessageSession", ctx, mock.Anything, mock.Anything).Return(session, nil).Once()
config.Client = client
l, err := New(config)
require.Nil(t, err)
var called bool
handler := listenermocks.NewHandler(t)
handler.On("HandleDesiredRunnerCount", mock.Anything, mock.Anything).
Return(nil).
Run(
func(mock.Arguments) {
called = true
cancel()
},
).
Once()
err = l.Listen(ctx, handler)
assert.True(t, errors.Is(err, context.Canceled))
assert.True(t, called)
})
}
func TestListener_acquireAvailableJobs(t *testing.T) {
t.Parallel()
t.Run("FailingToAcquireJobs", func(t *testing.T) {
t.Parallel()
ctx := context.Background()
config := Config{
ScaleSetID: 1,
Metrics: metrics.Discard,
}
client := listenermocks.NewClient(t)
client.On("AcquireJobs", ctx, mock.Anything, mock.Anything, mock.Anything).Return(nil, assert.AnError).Once()
config.Client = client
l, err := New(config)
require.Nil(t, err)
uuid := uuid.New()
l.session = &actions.RunnerScaleSetSession{
SessionId: &uuid,
OwnerName: "example",
RunnerScaleSet: &actions.RunnerScaleSet{},
MessageQueueUrl: "https://example.com",
MessageQueueAccessToken: "1234567890",
Statistics: &actions.RunnerScaleSetStatistic{},
}
_, err = l.acquireAvailableJobs(ctx, []int64{1, 2, 3})
assert.Error(t, err)
})
t.Run("SuccessfullyAcquiresJobsOnFirstRun", func(t *testing.T) {
t.Parallel()
ctx := context.Background()
config := Config{
ScaleSetID: 1,
Metrics: metrics.Discard,
}
client := listenermocks.NewClient(t)
jobIDs := []int64{1, 2, 3}
client.On("AcquireJobs", ctx, mock.Anything, mock.Anything, mock.Anything).Return(jobIDs, nil).Once()
config.Client = client
l, err := New(config)
require.Nil(t, err)
uuid := uuid.New()
l.session = &actions.RunnerScaleSetSession{
SessionId: &uuid,
OwnerName: "example",
RunnerScaleSet: &actions.RunnerScaleSet{},
MessageQueueUrl: "https://example.com",
MessageQueueAccessToken: "1234567890",
Statistics: &actions.RunnerScaleSetStatistic{},
}
acquiredJobIDs, err := l.acquireAvailableJobs(ctx, []int64{1, 2, 3})
assert.NoError(t, err)
assert.Equal(t, jobIDs, acquiredJobIDs)
})
t.Run("RefreshAndSucceeds", func(t *testing.T) {
t.Parallel()
ctx := context.Background()
config := Config{
ScaleSetID: 1,
Metrics: metrics.Discard,
}
client := listenermocks.NewClient(t)
uuid := uuid.New()
session := &actions.RunnerScaleSetSession{
SessionId: &uuid,
OwnerName: "example",
RunnerScaleSet: &actions.RunnerScaleSet{},
MessageQueueUrl: "https://example.com",
MessageQueueAccessToken: "1234567890",
Statistics: nil,
}
client.On("RefreshMessageSession", ctx, mock.Anything, mock.Anything).Return(session, nil).Once()
// First call to AcquireJobs will fail with a token expired error
client.On("AcquireJobs", ctx, mock.Anything, mock.Anything, mock.Anything).Return(nil, &actions.MessageQueueTokenExpiredError{}).Once()
// Second call to AcquireJobs will succeed
want := []int64{1, 2, 3}
client.On("AcquireJobs", ctx, mock.Anything, mock.Anything, mock.Anything).Return(want, nil).Once()
config.Client = client
l, err := New(config)
require.Nil(t, err)
l.session = &actions.RunnerScaleSetSession{
SessionId: &uuid,
RunnerScaleSet: &actions.RunnerScaleSet{},
}
got, err := l.acquireAvailableJobs(ctx, want)
assert.Nil(t, err)
assert.Equal(t, want, got)
})
t.Run("RefreshAndFails", func(t *testing.T) {
t.Parallel()
ctx := context.Background()
config := Config{
ScaleSetID: 1,
Metrics: metrics.Discard,
}
client := listenermocks.NewClient(t)
uuid := uuid.New()
session := &actions.RunnerScaleSetSession{
SessionId: &uuid,
OwnerName: "example",
RunnerScaleSet: &actions.RunnerScaleSet{},
MessageQueueUrl: "https://example.com",
MessageQueueAccessToken: "1234567890",
Statistics: nil,
}
client.On("RefreshMessageSession", ctx, mock.Anything, mock.Anything).Return(session, nil).Once()
client.On("AcquireJobs", ctx, mock.Anything, mock.Anything, mock.Anything).Return(nil, &actions.MessageQueueTokenExpiredError{}).Twice()
config.Client = client
l, err := New(config)
require.Nil(t, err)
l.session = &actions.RunnerScaleSetSession{
SessionId: &uuid,
RunnerScaleSet: &actions.RunnerScaleSet{},
}
got, err := l.acquireAvailableJobs(ctx, []int64{1, 2, 3})
assert.NotNil(t, err)
assert.Nil(t, got)
})
}

View File

@@ -0,0 +1,176 @@
// Code generated by mockery v2.36.1. DO NOT EDIT.
package mocks
import (
context "context"
actions "github.com/actions/actions-runner-controller/github/actions"
mock "github.com/stretchr/testify/mock"
uuid "github.com/google/uuid"
)
// Client is an autogenerated mock type for the Client type
type Client struct {
mock.Mock
}
// AcquireJobs provides a mock function with given fields: ctx, runnerScaleSetId, messageQueueAccessToken, requestIds
func (_m *Client) AcquireJobs(ctx context.Context, runnerScaleSetId int, messageQueueAccessToken string, requestIds []int64) ([]int64, error) {
ret := _m.Called(ctx, runnerScaleSetId, messageQueueAccessToken, requestIds)
var r0 []int64
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, int, string, []int64) ([]int64, error)); ok {
return rf(ctx, runnerScaleSetId, messageQueueAccessToken, requestIds)
}
if rf, ok := ret.Get(0).(func(context.Context, int, string, []int64) []int64); ok {
r0 = rf(ctx, runnerScaleSetId, messageQueueAccessToken, requestIds)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]int64)
}
}
if rf, ok := ret.Get(1).(func(context.Context, int, string, []int64) error); ok {
r1 = rf(ctx, runnerScaleSetId, messageQueueAccessToken, requestIds)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// CreateMessageSession provides a mock function with given fields: ctx, runnerScaleSetId, owner
func (_m *Client) CreateMessageSession(ctx context.Context, runnerScaleSetId int, owner string) (*actions.RunnerScaleSetSession, error) {
ret := _m.Called(ctx, runnerScaleSetId, owner)
var r0 *actions.RunnerScaleSetSession
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, int, string) (*actions.RunnerScaleSetSession, error)); ok {
return rf(ctx, runnerScaleSetId, owner)
}
if rf, ok := ret.Get(0).(func(context.Context, int, string) *actions.RunnerScaleSetSession); ok {
r0 = rf(ctx, runnerScaleSetId, owner)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*actions.RunnerScaleSetSession)
}
}
if rf, ok := ret.Get(1).(func(context.Context, int, string) error); ok {
r1 = rf(ctx, runnerScaleSetId, owner)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// DeleteMessage provides a mock function with given fields: ctx, messageQueueUrl, messageQueueAccessToken, messageId
func (_m *Client) DeleteMessage(ctx context.Context, messageQueueUrl string, messageQueueAccessToken string, messageId int64) error {
ret := _m.Called(ctx, messageQueueUrl, messageQueueAccessToken, messageId)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string, string, int64) error); ok {
r0 = rf(ctx, messageQueueUrl, messageQueueAccessToken, messageId)
} else {
r0 = ret.Error(0)
}
return r0
}
// GetAcquirableJobs provides a mock function with given fields: ctx, runnerScaleSetId
func (_m *Client) GetAcquirableJobs(ctx context.Context, runnerScaleSetId int) (*actions.AcquirableJobList, error) {
ret := _m.Called(ctx, runnerScaleSetId)
var r0 *actions.AcquirableJobList
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, int) (*actions.AcquirableJobList, error)); ok {
return rf(ctx, runnerScaleSetId)
}
if rf, ok := ret.Get(0).(func(context.Context, int) *actions.AcquirableJobList); ok {
r0 = rf(ctx, runnerScaleSetId)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*actions.AcquirableJobList)
}
}
if rf, ok := ret.Get(1).(func(context.Context, int) error); ok {
r1 = rf(ctx, runnerScaleSetId)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// GetMessage provides a mock function with given fields: ctx, messageQueueUrl, messageQueueAccessToken, lastMessageId
func (_m *Client) GetMessage(ctx context.Context, messageQueueUrl string, messageQueueAccessToken string, lastMessageId int64) (*actions.RunnerScaleSetMessage, error) {
ret := _m.Called(ctx, messageQueueUrl, messageQueueAccessToken, lastMessageId)
var r0 *actions.RunnerScaleSetMessage
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, string, string, int64) (*actions.RunnerScaleSetMessage, error)); ok {
return rf(ctx, messageQueueUrl, messageQueueAccessToken, lastMessageId)
}
if rf, ok := ret.Get(0).(func(context.Context, string, string, int64) *actions.RunnerScaleSetMessage); ok {
r0 = rf(ctx, messageQueueUrl, messageQueueAccessToken, lastMessageId)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*actions.RunnerScaleSetMessage)
}
}
if rf, ok := ret.Get(1).(func(context.Context, string, string, int64) error); ok {
r1 = rf(ctx, messageQueueUrl, messageQueueAccessToken, lastMessageId)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// RefreshMessageSession provides a mock function with given fields: ctx, runnerScaleSetId, sessionId
func (_m *Client) RefreshMessageSession(ctx context.Context, runnerScaleSetId int, sessionId *uuid.UUID) (*actions.RunnerScaleSetSession, error) {
ret := _m.Called(ctx, runnerScaleSetId, sessionId)
var r0 *actions.RunnerScaleSetSession
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, int, *uuid.UUID) (*actions.RunnerScaleSetSession, error)); ok {
return rf(ctx, runnerScaleSetId, sessionId)
}
if rf, ok := ret.Get(0).(func(context.Context, int, *uuid.UUID) *actions.RunnerScaleSetSession); ok {
r0 = rf(ctx, runnerScaleSetId, sessionId)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*actions.RunnerScaleSetSession)
}
}
if rf, ok := ret.Get(1).(func(context.Context, int, *uuid.UUID) error); ok {
r1 = rf(ctx, runnerScaleSetId, sessionId)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// NewClient creates a new instance of Client. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewClient(t interface {
mock.TestingT
Cleanup(func())
}) *Client {
mock := &Client{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@@ -0,0 +1,58 @@
// Code generated by mockery v2.36.1. DO NOT EDIT.
package mocks
import (
context "context"
actions "github.com/actions/actions-runner-controller/github/actions"
mock "github.com/stretchr/testify/mock"
)
// Handler is an autogenerated mock type for the Handler type
type Handler struct {
mock.Mock
}
// HandleDesiredRunnerCount provides a mock function with given fields: ctx, desiredRunnerCount
func (_m *Handler) HandleDesiredRunnerCount(ctx context.Context, desiredRunnerCount int) error {
ret := _m.Called(ctx, desiredRunnerCount)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, int) error); ok {
r0 = rf(ctx, desiredRunnerCount)
} else {
r0 = ret.Error(0)
}
return r0
}
// HandleJobStarted provides a mock function with given fields: ctx, jobInfo
func (_m *Handler) HandleJobStarted(ctx context.Context, jobInfo *actions.JobStarted) error {
ret := _m.Called(ctx, jobInfo)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, *actions.JobStarted) error); ok {
r0 = rf(ctx, jobInfo)
} else {
r0 = ret.Error(0)
}
return r0
}
// NewHandler creates a new instance of Handler. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewHandler(t interface {
mock.TestingT
Cleanup(func())
}) *Handler {
mock := &Handler{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

40
cmd/ghalistener/main.go Normal file
View File

@@ -0,0 +1,40 @@
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"github.com/actions/actions-runner-controller/cmd/ghalistener/app"
"github.com/actions/actions-runner-controller/cmd/ghalistener/config"
)
func main() {
configPath, ok := os.LookupEnv("LISTENER_CONFIG_PATH")
if !ok {
fmt.Fprintf(os.Stderr, "Error: LISTENER_CONFIG_PATH environment variable is not set\n")
os.Exit(1)
}
config, err := config.Read(configPath)
if err != nil {
log.Printf("Failed to read config: %v", err)
os.Exit(1)
}
app, err := app.New(config)
if err != nil {
log.Printf("Failed to initialize app: %v", err)
os.Exit(1)
}
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
if err := app.Run(ctx); err != nil {
log.Printf("Application returned an error: %v", err)
os.Exit(1)
}
}

View File

@@ -0,0 +1,387 @@
package metrics
import (
"context"
"net/http"
"strconv"
"github.com/actions/actions-runner-controller/github/actions"
"github.com/go-logr/logr"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
const (
labelKeyRunnerScaleSetName = "name"
labelKeyRunnerScaleSetNamespace = "namespace"
labelKeyEnterprise = "enterprise"
labelKeyOrganization = "organization"
labelKeyRepository = "repository"
labelKeyJobName = "job_name"
labelKeyJobWorkflowRef = "job_workflow_ref"
labelKeyEventName = "event_name"
labelKeyJobResult = "job_result"
labelKeyRunnerID = "runner_id"
labelKeyRunnerName = "runner_name"
)
const githubScaleSetSubsystem = "gha"
// labels
var (
scaleSetLabels = []string{
labelKeyRunnerScaleSetName,
labelKeyRepository,
labelKeyOrganization,
labelKeyEnterprise,
labelKeyRunnerScaleSetNamespace,
}
jobLabels = []string{
labelKeyRepository,
labelKeyOrganization,
labelKeyEnterprise,
labelKeyJobName,
labelKeyJobWorkflowRef,
labelKeyEventName,
}
completedJobsTotalLabels = append(jobLabels, labelKeyJobResult, labelKeyRunnerID, labelKeyRunnerName)
jobExecutionDurationLabels = append(jobLabels, labelKeyJobResult, labelKeyRunnerID, labelKeyRunnerName)
startedJobsTotalLabels = append(jobLabels, labelKeyRunnerID, labelKeyRunnerName)
jobStartupDurationLabels = append(jobLabels, labelKeyRunnerID, labelKeyRunnerName)
)
var (
assignedJobs = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: githubScaleSetSubsystem,
Name: "assigned_jobs",
Help: "Number of jobs assigned to this scale set.",
},
scaleSetLabels,
)
runningJobs = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: githubScaleSetSubsystem,
Name: "running_jobs",
Help: "Number of jobs running (or about to be run).",
},
scaleSetLabels,
)
registeredRunners = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: githubScaleSetSubsystem,
Name: "registered_runners",
Help: "Number of runners registered by the scale set.",
},
scaleSetLabels,
)
busyRunners = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: githubScaleSetSubsystem,
Name: "busy_runners",
Help: "Number of registered runners running a job.",
},
scaleSetLabels,
)
minRunners = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: githubScaleSetSubsystem,
Name: "min_runners",
Help: "Minimum number of runners.",
},
scaleSetLabels,
)
maxRunners = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: githubScaleSetSubsystem,
Name: "max_runners",
Help: "Maximum number of runners.",
},
scaleSetLabels,
)
desiredRunners = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: githubScaleSetSubsystem,
Name: "desired_runners",
Help: "Number of runners desired by the scale set.",
},
scaleSetLabels,
)
idleRunners = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: githubScaleSetSubsystem,
Name: "idle_runners",
Help: "Number of registered runners not running a job.",
},
scaleSetLabels,
)
startedJobsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: githubScaleSetSubsystem,
Name: "started_jobs_total",
Help: "Total number of jobs started.",
},
startedJobsTotalLabels,
)
completedJobsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "completed_jobs_total",
Help: "Total number of jobs completed.",
Subsystem: githubScaleSetSubsystem,
},
completedJobsTotalLabels,
)
jobStartupDurationSeconds = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: githubScaleSetSubsystem,
Name: "job_startup_duration_seconds",
Help: "Time spent waiting for workflow job to get started on the runner owned by the scale set (in seconds).",
Buckets: runtimeBuckets,
},
jobStartupDurationLabels,
)
jobExecutionDurationSeconds = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: githubScaleSetSubsystem,
Name: "job_execution_duration_seconds",
Help: "Time spent executing workflow jobs by the scale set (in seconds).",
Buckets: runtimeBuckets,
},
jobExecutionDurationLabels,
)
)
var runtimeBuckets []float64 = []float64{
0.01,
0.05,
0.1,
0.5,
1,
2,
3,
4,
5,
6,
7,
8,
9,
10,
12,
15,
18,
20,
25,
30,
40,
50,
60,
70,
80,
90,
100,
110,
120,
150,
180,
210,
240,
300,
360,
420,
480,
540,
600,
900,
1200,
1800,
2400,
3000,
3600,
}
type baseLabels struct {
scaleSetName string
scaleSetNamespace string
enterprise string
organization string
repository string
}
func (b *baseLabels) jobLabels(jobBase *actions.JobMessageBase) prometheus.Labels {
return prometheus.Labels{
labelKeyEnterprise: b.enterprise,
labelKeyOrganization: b.organization,
labelKeyRepository: b.repository,
labelKeyJobName: jobBase.JobDisplayName,
labelKeyJobWorkflowRef: jobBase.JobWorkflowRef,
labelKeyEventName: jobBase.EventName,
}
}
func (b *baseLabels) scaleSetLabels() prometheus.Labels {
return prometheus.Labels{
labelKeyRunnerScaleSetName: b.scaleSetName,
labelKeyRunnerScaleSetNamespace: b.scaleSetNamespace,
labelKeyEnterprise: b.enterprise,
labelKeyOrganization: b.organization,
labelKeyRepository: b.repository,
}
}
func (b *baseLabels) completedJobLabels(msg *actions.JobCompleted) prometheus.Labels {
l := b.jobLabels(&msg.JobMessageBase)
l[labelKeyRunnerID] = strconv.Itoa(msg.RunnerId)
l[labelKeyJobResult] = msg.Result
l[labelKeyRunnerName] = msg.RunnerName
return l
}
func (b *baseLabels) startedJobLabels(msg *actions.JobStarted) prometheus.Labels {
l := b.jobLabels(&msg.JobMessageBase)
l[labelKeyRunnerID] = strconv.Itoa(msg.RunnerId)
l[labelKeyRunnerName] = msg.RunnerName
return l
}
//go:generate mockery --name Publisher --output ./mocks --outpkg mocks --case underscore
type Publisher interface {
PublishStatic(min, max int)
PublishStatistics(stats *actions.RunnerScaleSetStatistic)
PublishJobStarted(msg *actions.JobStarted)
PublishJobCompleted(msg *actions.JobCompleted)
PublishDesiredRunners(count int)
}
//go:generate mockery --name ServerPublisher --output ./mocks --outpkg mocks --case underscore
type ServerPublisher interface {
Publisher
ListenAndServe(ctx context.Context) error
}
var _ Publisher = &discard{}
var _ ServerPublisher = &exporter{}
var Discard Publisher = &discard{}
type exporter struct {
logger logr.Logger
baseLabels
srv *http.Server
}
type ExporterConfig struct {
ScaleSetName string
ScaleSetNamespace string
Enterprise string
Organization string
Repository string
ServerAddr string
ServerEndpoint string
Logger logr.Logger
}
func NewExporter(config ExporterConfig) ServerPublisher {
reg := prometheus.NewRegistry()
reg.MustRegister(
assignedJobs,
runningJobs,
registeredRunners,
busyRunners,
minRunners,
maxRunners,
desiredRunners,
idleRunners,
startedJobsTotal,
completedJobsTotal,
jobStartupDurationSeconds,
jobExecutionDurationSeconds,
)
mux := http.NewServeMux()
mux.Handle(
config.ServerEndpoint,
promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg}),
)
return &exporter{
logger: config.Logger.WithName("metrics"),
baseLabels: baseLabels{
scaleSetName: config.ScaleSetName,
scaleSetNamespace: config.ScaleSetNamespace,
enterprise: config.Enterprise,
organization: config.Organization,
repository: config.Repository,
},
srv: &http.Server{
Addr: config.ServerAddr,
Handler: mux,
},
}
}
func (e *exporter) ListenAndServe(ctx context.Context) error {
e.logger.Info("starting metrics server", "addr", e.srv.Addr)
go func() {
<-ctx.Done()
e.logger.Info("stopping metrics server")
e.srv.Shutdown(ctx)
}()
return e.srv.ListenAndServe()
}
func (m *exporter) PublishStatic(min, max int) {
l := m.scaleSetLabels()
maxRunners.With(l).Set(float64(max))
minRunners.With(l).Set(float64(min))
}
func (e *exporter) PublishStatistics(stats *actions.RunnerScaleSetStatistic) {
l := e.scaleSetLabels()
assignedJobs.With(l).Set(float64(stats.TotalAssignedJobs))
runningJobs.With(l).Set(float64(stats.TotalRunningJobs))
registeredRunners.With(l).Set(float64(stats.TotalRegisteredRunners))
busyRunners.With(l).Set(float64(stats.TotalBusyRunners))
idleRunners.With(l).Set(float64(stats.TotalIdleRunners))
}
func (e *exporter) PublishJobStarted(msg *actions.JobStarted) {
l := e.startedJobLabels(msg)
startedJobsTotal.With(l).Inc()
startupDuration := msg.JobMessageBase.RunnerAssignTime.Unix() - msg.JobMessageBase.ScaleSetAssignTime.Unix()
jobStartupDurationSeconds.With(l).Observe(float64(startupDuration))
}
func (e *exporter) PublishJobCompleted(msg *actions.JobCompleted) {
l := e.completedJobLabels(msg)
completedJobsTotal.With(l).Inc()
executionDuration := msg.JobMessageBase.FinishTime.Unix() - msg.JobMessageBase.RunnerAssignTime.Unix()
jobExecutionDurationSeconds.With(l).Observe(float64(executionDuration))
}
func (m *exporter) PublishDesiredRunners(count int) {
desiredRunners.With(m.scaleSetLabels()).Set(float64(count))
}
type discard struct{}
func (*discard) PublishStatic(int, int) {}
func (*discard) PublishStatistics(*actions.RunnerScaleSetStatistic) {}
func (*discard) PublishJobStarted(*actions.JobStarted) {}
func (*discard) PublishJobCompleted(*actions.JobCompleted) {}
func (*discard) PublishDesiredRunners(int) {}

View File

@@ -0,0 +1,53 @@
// Code generated by mockery v2.36.1. DO NOT EDIT.
package mocks
import (
actions "github.com/actions/actions-runner-controller/github/actions"
mock "github.com/stretchr/testify/mock"
)
// Publisher is an autogenerated mock type for the Publisher type
type Publisher struct {
mock.Mock
}
// PublishDesiredRunners provides a mock function with given fields: count
func (_m *Publisher) PublishDesiredRunners(count int) {
_m.Called(count)
}
// PublishJobCompleted provides a mock function with given fields: msg
func (_m *Publisher) PublishJobCompleted(msg *actions.JobCompleted) {
_m.Called(msg)
}
// PublishJobStarted provides a mock function with given fields: msg
func (_m *Publisher) PublishJobStarted(msg *actions.JobStarted) {
_m.Called(msg)
}
// PublishStatic provides a mock function with given fields: min, max
func (_m *Publisher) PublishStatic(min int, max int) {
_m.Called(min, max)
}
// PublishStatistics provides a mock function with given fields: stats
func (_m *Publisher) PublishStatistics(stats *actions.RunnerScaleSetStatistic) {
_m.Called(stats)
}
// NewPublisher creates a new instance of Publisher. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewPublisher(t interface {
mock.TestingT
Cleanup(func())
}) *Publisher {
mock := &Publisher{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@@ -0,0 +1,69 @@
// Code generated by mockery v2.36.1. DO NOT EDIT.
package mocks
import (
context "context"
actions "github.com/actions/actions-runner-controller/github/actions"
mock "github.com/stretchr/testify/mock"
)
// ServerPublisher is an autogenerated mock type for the ServerPublisher type
type ServerPublisher struct {
mock.Mock
}
// ListenAndServe provides a mock function with given fields: ctx
func (_m *ServerPublisher) ListenAndServe(ctx context.Context) error {
ret := _m.Called(ctx)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context) error); ok {
r0 = rf(ctx)
} else {
r0 = ret.Error(0)
}
return r0
}
// PublishDesiredRunners provides a mock function with given fields: count
func (_m *ServerPublisher) PublishDesiredRunners(count int) {
_m.Called(count)
}
// PublishJobCompleted provides a mock function with given fields: msg
func (_m *ServerPublisher) PublishJobCompleted(msg *actions.JobCompleted) {
_m.Called(msg)
}
// PublishJobStarted provides a mock function with given fields: msg
func (_m *ServerPublisher) PublishJobStarted(msg *actions.JobStarted) {
_m.Called(msg)
}
// PublishStatic provides a mock function with given fields: min, max
func (_m *ServerPublisher) PublishStatic(min int, max int) {
_m.Called(min, max)
}
// PublishStatistics provides a mock function with given fields: stats
func (_m *ServerPublisher) PublishStatistics(stats *actions.RunnerScaleSetStatistic) {
_m.Called(stats)
}
// NewServerPublisher creates a new instance of ServerPublisher. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewServerPublisher(t interface {
mock.TestingT
Cleanup(func())
}) *ServerPublisher {
mock := &ServerPublisher{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@@ -0,0 +1,229 @@
package worker
import (
"context"
"encoding/json"
"fmt"
"github.com/actions/actions-runner-controller/apis/actions.github.com/v1alpha1"
"github.com/actions/actions-runner-controller/cmd/ghalistener/listener"
"github.com/actions/actions-runner-controller/github/actions"
"github.com/actions/actions-runner-controller/logging"
jsonpatch "github.com/evanphx/json-patch"
"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
const workerName = "kubernetesworker"
type Option func(*Worker)
func WithLogger(logger logr.Logger) Option {
return func(w *Worker) {
logger = logger.WithName(workerName)
w.logger = &logger
}
}
type Config struct {
EphemeralRunnerSetNamespace string
EphemeralRunnerSetName string
MaxRunners int
MinRunners int
}
// The Worker's role is to process the messages it receives from the listener.
// It then initiates Kubernetes API requests to carry out the necessary actions.
type Worker struct {
clientset *kubernetes.Clientset
config Config
lastPatch int
logger *logr.Logger
}
var _ listener.Handler = (*Worker)(nil)
func New(config Config, options ...Option) (*Worker, error) {
w := &Worker{
config: config,
lastPatch: -1,
}
conf, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
clientset, err := kubernetes.NewForConfig(conf)
if err != nil {
return nil, err
}
w.clientset = clientset
for _, option := range options {
option(w)
}
if err := w.applyDefaults(); err != nil {
return nil, err
}
return w, nil
}
func (w *Worker) applyDefaults() error {
if w.logger == nil {
logger, err := logging.NewLogger(logging.LogLevelDebug, logging.LogFormatJSON)
if err != nil {
return fmt.Errorf("NewLogger failed: %w", err)
}
logger = logger.WithName(workerName)
w.logger = &logger
}
return nil
}
// HandleJobStarted updates the job information for the ephemeral runner when a job is started.
// It takes a context and a jobInfo parameter which contains the details of the started job.
// This update marks the ephemeral runner so that the controller would have more context
// about the ephemeral runner that should not be deleted when scaling down.
// It returns an error if there is any issue with updating the job information.
func (w *Worker) HandleJobStarted(ctx context.Context, jobInfo *actions.JobStarted) error {
w.logger.Info("Updating job info for the runner",
"runnerName", jobInfo.RunnerName,
"ownerName", jobInfo.OwnerName,
"repoName", jobInfo.RepositoryName,
"workflowRef", jobInfo.JobWorkflowRef,
"workflowRunId", jobInfo.WorkflowRunId,
"jobDisplayName", jobInfo.JobDisplayName,
"requestId", jobInfo.RunnerRequestId)
original, err := json.Marshal(&v1alpha1.EphemeralRunner{})
if err != nil {
return fmt.Errorf("failed to marshal empty ephemeral runner: %w", err)
}
patch, err := json.Marshal(
&v1alpha1.EphemeralRunner{
Status: v1alpha1.EphemeralRunnerStatus{
JobRequestId: jobInfo.RunnerRequestId,
JobRepositoryName: fmt.Sprintf("%s/%s", jobInfo.OwnerName, jobInfo.RepositoryName),
WorkflowRunId: jobInfo.WorkflowRunId,
JobWorkflowRef: jobInfo.JobWorkflowRef,
JobDisplayName: jobInfo.JobDisplayName,
},
},
)
if err != nil {
return fmt.Errorf("failed to marshal ephemeral runner patch: %w", err)
}
mergePatch, err := jsonpatch.CreateMergePatch(original, patch)
if err != nil {
return fmt.Errorf("failed to create merge patch json for ephemeral runner: %w", err)
}
w.logger.Info("Updating ephemeral runner with merge patch", "json", string(mergePatch))
patchedStatus := &v1alpha1.EphemeralRunner{}
err = w.clientset.RESTClient().
Patch(types.MergePatchType).
Prefix("apis", v1alpha1.GroupVersion.Group, v1alpha1.GroupVersion.Version).
Namespace(w.config.EphemeralRunnerSetNamespace).
Resource("EphemeralRunners").
Name(jobInfo.RunnerName).
SubResource("status").
Body(mergePatch).
Do(ctx).
Into(patchedStatus)
if err != nil {
return fmt.Errorf("could not patch ephemeral runner status, patch JSON: %s, error: %w", string(mergePatch), err)
}
w.logger.Info("Ephemeral runner status updated with the merge patch successfully.")
return nil
}
// HandleDesiredRunnerCount handles the desired runner count by scaling the ephemeral runner set.
// The function calculates the target runner count based on the minimum and maximum runner count configuration.
// If the target runner count is the same as the last patched count, it skips patching and returns nil.
// Otherwise, it creates a merge patch JSON for updating the ephemeral runner set with the desired count.
// The function then scales the ephemeral runner set by applying the merge patch.
// Finally, it logs the scaled ephemeral runner set details and returns nil if successful.
// If any error occurs during the process, it returns an error with a descriptive message.
func (w *Worker) HandleDesiredRunnerCount(ctx context.Context, count int) error {
// Max runners should always be set by the resource builder either to the configured value,
// or the maximum int32 (resourcebuilder.newAutoScalingListener()).
targetRunnerCount := min(w.config.MinRunners+count, w.config.MaxRunners)
logValues := []any{
"assigned job", count,
"decision", targetRunnerCount,
"min", w.config.MinRunners,
"max", w.config.MaxRunners,
"currentRunnerCount", w.lastPatch,
}
if targetRunnerCount == w.lastPatch {
w.logger.Info("Skipping patching of EphemeralRunnerSet as the desired count has not changed", logValues...)
return nil
}
original, err := json.Marshal(
&v1alpha1.EphemeralRunnerSet{
Spec: v1alpha1.EphemeralRunnerSetSpec{
Replicas: -1,
},
},
)
if err != nil {
return fmt.Errorf("failed to marshal empty ephemeral runner set: %w", err)
}
patch, err := json.Marshal(
&v1alpha1.EphemeralRunnerSet{
Spec: v1alpha1.EphemeralRunnerSetSpec{
Replicas: targetRunnerCount,
},
},
)
if err != nil {
w.logger.Error(err, "could not marshal patch ephemeral runner set")
return err
}
mergePatch, err := jsonpatch.CreateMergePatch(original, patch)
if err != nil {
return fmt.Errorf("failed to create merge patch json for ephemeral runner set: %w", err)
}
w.logger.Info("Created merge patch json for EphemeralRunnerSet update", "json", string(mergePatch))
w.logger.Info("Scaling ephemeral runner set", logValues...)
patchedEphemeralRunnerSet := &v1alpha1.EphemeralRunnerSet{}
err = w.clientset.RESTClient().
Patch(types.MergePatchType).
Prefix("apis", v1alpha1.GroupVersion.Group, v1alpha1.GroupVersion.Version).
Namespace(w.config.EphemeralRunnerSetNamespace).
Resource("ephemeralrunnersets").
Name(w.config.EphemeralRunnerSetName).
Body([]byte(mergePatch)).
Do(ctx).
Into(patchedEphemeralRunnerSet)
if err != nil {
return fmt.Errorf("could not patch ephemeral runner set , patch JSON: %s, error: %w", string(mergePatch), err)
}
w.logger.Info("Ephemeral runner set scaled.",
"namespace", w.config.EphemeralRunnerSetNamespace,
"name", w.config.EphemeralRunnerSetName,
"replicas", patchedEphemeralRunnerSet.Spec.Replicas,
)
return nil
}

View File

@@ -5,7 +5,6 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"strings"
"github.com/actions/actions-runner-controller/cmd/githubrunnerscalesetlistener/config"
@@ -206,7 +205,9 @@ func (s *Service) processMessage(message *actions.RunnerScaleSetMessage) error {
}
func (s *Service) scaleForAssignedJobCount(count int) error {
targetRunnerCount := int(math.Max(math.Min(float64(s.settings.MaxRunners), float64(count)), float64(s.settings.MinRunners)))
// Max runners should always be set by the resource builder either to the configured value,
// or the maximum int32 (resourcebuilder.newAutoScalingListener()).
targetRunnerCount := min(s.settings.MinRunners+count, s.settings.MaxRunners)
s.metricsExporter.publishDesiredRunners(targetRunnerCount)
if targetRunnerCount != s.currentRunnerCount {
s.logger.Info("try scale runner request up/down base on assigned job count",

View File

@@ -397,7 +397,7 @@ func TestProcessMessage_MultipleMessages(t *testing.T) {
require.NoError(t, err)
mockRsClient.On("AcquireJobsForRunnerScaleSet", ctx, mock.MatchedBy(func(ids []int64) bool { return ids[0] == 3 && ids[1] == 4 })).Return(nil).Once()
mockKubeManager.On("ScaleEphemeralRunnerSet", ctx, service.settings.Namespace, service.settings.ResourceName, 2).Run(func(args mock.Arguments) { cancel() }).Return(nil).Once()
mockKubeManager.On("ScaleEphemeralRunnerSet", ctx, service.settings.Namespace, service.settings.ResourceName, 3).Run(func(args mock.Arguments) { cancel() }).Return(nil).Once()
err = service.processMessage(&actions.RunnerScaleSetMessage{
MessageId: 1,
@@ -523,9 +523,9 @@ func TestScaleForAssignedJobCount_ScaleWithinMinMax(t *testing.T) {
require.NoError(t, err)
mockKubeManager.On("ScaleEphemeralRunnerSet", ctx, service.settings.Namespace, service.settings.ResourceName, 1).Return(nil).Once()
mockKubeManager.On("ScaleEphemeralRunnerSet", ctx, service.settings.Namespace, service.settings.ResourceName, 3).Return(nil).Once()
mockKubeManager.On("ScaleEphemeralRunnerSet", ctx, service.settings.Namespace, service.settings.ResourceName, 4).Return(nil).Once()
mockKubeManager.On("ScaleEphemeralRunnerSet", ctx, service.settings.Namespace, service.settings.ResourceName, 5).Return(nil).Once()
mockKubeManager.On("ScaleEphemeralRunnerSet", ctx, service.settings.Namespace, service.settings.ResourceName, 1).Return(nil).Once()
mockKubeManager.On("ScaleEphemeralRunnerSet", ctx, service.settings.Namespace, service.settings.ResourceName, 2).Return(nil).Once()
mockKubeManager.On("ScaleEphemeralRunnerSet", ctx, service.settings.Namespace, service.settings.ResourceName, 5).Return(nil).Once()
err = service.scaleForAssignedJobCount(0)
@@ -569,7 +569,7 @@ func TestScaleForAssignedJobCount_ScaleFailed(t *testing.T) {
)
require.NoError(t, err)
mockKubeManager.On("ScaleEphemeralRunnerSet", ctx, service.settings.Namespace, service.settings.ResourceName, 2).Return(fmt.Errorf("error"))
mockKubeManager.On("ScaleEphemeralRunnerSet", ctx, service.settings.Namespace, service.settings.ResourceName, 3).Return(fmt.Errorf("error"))
err = service.scaleForAssignedJobCount(2)
@@ -605,8 +605,23 @@ func TestProcessMessage_JobStartedMessage(t *testing.T) {
service.currentRunnerCount = 1
mockKubeManager.On("UpdateEphemeralRunnerWithJobInfo", ctx, service.settings.Namespace, "runner1", "owner1", "repo1", ".github/workflows/ci.yaml", "job1", int64(100), int64(3)).Run(func(args mock.Arguments) { cancel() }).Return(nil).Once()
mockKubeManager.On(
"UpdateEphemeralRunnerWithJobInfo",
ctx,
service.settings.Namespace,
"runner1",
"owner1",
"repo1",
".github/workflows/ci.yaml",
"job1",
int64(100),
int64(3),
).Run(
func(_ mock.Arguments) { cancel() },
).Return(nil).Once()
mockRsClient.On("AcquireJobsForRunnerScaleSet", ctx, mock.MatchedBy(func(ids []int64) bool { return len(ids) == 0 })).Return(nil).Once()
mockKubeManager.On("ScaleEphemeralRunnerSet", ctx, service.settings.Namespace, service.settings.ResourceName, 2).Return(nil)
err = service.processMessage(&actions.RunnerScaleSetMessage{
MessageId: 1,

View File

@@ -176,6 +176,8 @@ func run(ctx context.Context, rc config.Config, logger logr.Logger, opts runOpti
Version: build.Version,
CommitSHA: build.CommitSHA,
ScaleSetID: rc.RunnerScaleSetId,
HasProxy: hasProxy(),
Subsystem: "githubrunnerscalesetlistener",
})
if err != nil {
return fmt.Errorf("failed to create an Actions Service client: %w", err)
@@ -235,3 +237,8 @@ func newActionsClientFromConfig(config config.Config, creds *actions.ActionsAuth
return actions.NewClient(config.ConfigureUrl, creds, options...)
}
func hasProxy() bool {
proxyFunc := httpproxy.FromEnvironment().ProxyFunc()
return proxyFunc != nil
}

View File

@@ -1,4 +1,4 @@
// Code generated by mockery v2.33.2. DO NOT EDIT.
// Code generated by mockery v2.36.1. DO NOT EDIT.
package main

View File

@@ -1,4 +1,4 @@
// Code generated by mockery v2.33.2. DO NOT EDIT.
// Code generated by mockery v2.36.1. DO NOT EDIT.
package main

View File

@@ -41,7 +41,7 @@ import (
)
const (
autoscalingListenerContainerName = "autoscaler"
autoscalingListenerContainerName = "listener"
autoscalingListenerFinalizerName = "autoscalinglistener.actions.github.com/finalizer"
)

View File

@@ -425,7 +425,7 @@ var _ = Describe("Test AutoScalingListener customization", func() {
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "listener",
Name: autoscalingListenerContainerName,
ImagePullPolicy: corev1.PullAlways,
SecurityContext: &corev1.SecurityContext{
RunAsUser: &runAsUser,
@@ -555,7 +555,7 @@ var _ = Describe("Test AutoScalingListener customization", func() {
Expect(pod.Spec.SecurityContext.RunAsUser).To(Equal(&runAsUser), "Pod should have the correct security context")
Expect(pod.Spec.Containers[0].Name).NotTo(Equal("listener"), "Pod should have the correct container name")
Expect(pod.Spec.Containers[0].Name).To(Equal(autoscalingListenerContainerName), "Pod should have the correct container name")
Expect(pod.Spec.Containers[0].SecurityContext.RunAsUser).To(Equal(&runAsUser), "Pod should have the correct security context")
Expect(pod.Spec.Containers[0].ImagePullPolicy).To(Equal(corev1.PullAlways), "Pod should have the correct image pull policy")
@@ -854,7 +854,7 @@ var _ = Describe("Test AutoScalingListener controller with template modification
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "listener",
Name: autoscalingListenerContainerName,
ImagePullPolicy: corev1.PullAlways,
SecurityContext: &corev1.SecurityContext{
RunAsUser: &runAsUser1001,

View File

@@ -466,6 +466,8 @@ func (r *AutoscalingRunnerSetReconciler) createRunnerScaleSet(ctx context.Contex
Version: build.Version,
CommitSHA: build.CommitSHA,
ScaleSetID: runnerScaleSet.Id,
HasProxy: autoscalingRunnerSet.Spec.Proxy != nil,
Subsystem: "controller",
})
logger.Info("Created/Reused a runner scale set", "id", runnerScaleSet.Id, "runnerGroupName", runnerScaleSet.RunnerGroupName)

View File

@@ -66,3 +66,9 @@ const DefaultScaleSetListenerLogFormat = string(logging.LogFormatText)
// ownerKey is field selector matching the owner name of a particular resource
const resourceOwnerKey = ".metadata.controller"
// EphemeralRunner pod creation failure reasons
const (
ReasonTooManyPodFailures = "TooManyPodFailures"
ReasonInvalidPodFailure = "InvalidPod"
)

View File

@@ -192,7 +192,7 @@ func (r *EphemeralRunnerReconciler) Reconcile(ctx context.Context, req ctrl.Requ
case len(ephemeralRunner.Status.Failures) > 5:
log.Info("EphemeralRunner has failed more than 5 times. Marking it as failed")
errMessage := fmt.Sprintf("Pod has failed to start more than 5 times: %s", pod.Status.Message)
if err := r.markAsFailed(ctx, ephemeralRunner, errMessage, log); err != nil {
if err := r.markAsFailed(ctx, ephemeralRunner, errMessage, ReasonTooManyPodFailures, log); err != nil {
log.Error(err, "Failed to set ephemeral runner to phase Failed")
return ctrl.Result{}, err
}
@@ -201,7 +201,22 @@ func (r *EphemeralRunnerReconciler) Reconcile(ctx context.Context, req ctrl.Requ
default:
// Pod was not found. Create if the pod has never been created
log.Info("Creating new EphemeralRunner pod.")
return r.createPod(ctx, ephemeralRunner, secret, log)
result, err := r.createPod(ctx, ephemeralRunner, secret, log)
switch {
case err == nil:
return result, nil
case kerrors.IsInvalid(err) || kerrors.IsForbidden(err):
log.Error(err, "Failed to create a pod due to unrecoverable failure")
errMessage := fmt.Sprintf("Failed to create the pod: %v", err)
if err := r.markAsFailed(ctx, ephemeralRunner, errMessage, ReasonInvalidPodFailure, log); err != nil {
log.Error(err, "Failed to set ephemeral runner to phase Failed")
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
default:
log.Error(err, "Failed to create the pod")
return ctrl.Result{}, err
}
}
}
@@ -424,11 +439,11 @@ func (r *EphemeralRunnerReconciler) cleanupRunnerLinkedSecrets(ctx context.Conte
return false, multierr.Combine(errs...)
}
func (r *EphemeralRunnerReconciler) markAsFailed(ctx context.Context, ephemeralRunner *v1alpha1.EphemeralRunner, errMessage string, log logr.Logger) error {
func (r *EphemeralRunnerReconciler) markAsFailed(ctx context.Context, ephemeralRunner *v1alpha1.EphemeralRunner, errMessage string, reason string, log logr.Logger) error {
log.Info("Updating ephemeral runner status to Failed")
if err := patchSubResource(ctx, r.Status(), ephemeralRunner, func(obj *v1alpha1.EphemeralRunner) {
obj.Status.Phase = corev1.PodFailed
obj.Status.Reason = "TooManyPodFailures"
obj.Status.Reason = reason
obj.Status.Message = errMessage
}); err != nil {
return fmt.Errorf("failed to update ephemeral runner status Phase/Message: %v", err)

View File

@@ -189,6 +189,25 @@ var _ = Describe("EphemeralRunner", func() {
).Should(BeEquivalentTo(true))
})
It("It should failed if a pod template is invalid", func() {
invalideEphemeralRunner := newExampleRunner("invalid-ephemeral-runner", autoscalingNS.Name, configSecret.Name)
invalideEphemeralRunner.Spec.Spec.PriorityClassName = "notexist"
err := k8sClient.Create(ctx, invalideEphemeralRunner)
Expect(err).To(BeNil())
updated := new(v1alpha1.EphemeralRunner)
Eventually(func() (corev1.PodPhase, error) {
err := k8sClient.Get(ctx, client.ObjectKey{Name: invalideEphemeralRunner.Name, Namespace: invalideEphemeralRunner.Namespace}, updated)
if err != nil {
return "", nil
}
return updated.Status.Phase, nil
}, timeout, interval).Should(BeEquivalentTo(corev1.PodFailed))
Expect(updated.Status.Reason).Should(Equal("InvalidPod"))
Expect(updated.Status.Message).Should(Equal("Failed to create the pod: pods \"invalid-ephemeral-runner\" is forbidden: no PriorityClass with name notexist was found"))
})
It("It should clean up resources when deleted", func() {
// wait for pod to be created
pod := new(corev1.Pod)

View File

@@ -38,8 +38,11 @@ var commonLabelKeys = [...]string{
const labelValueKubernetesPartOf = "gha-runner-scale-set"
var scaleSetListenerLogLevel = DefaultScaleSetListenerLogLevel
var scaleSetListenerLogFormat = DefaultScaleSetListenerLogFormat
var (
scaleSetListenerLogLevel = DefaultScaleSetListenerLogLevel
scaleSetListenerLogFormat = DefaultScaleSetListenerLogFormat
scaleSetListenerEntrypoint = "/ghalistener"
)
func SetListenerLoggingParameters(level string, format string) bool {
switch level {
@@ -59,6 +62,12 @@ func SetListenerLoggingParameters(level string, format string) bool {
return true
}
func SetListenerEntrypoint(entrypoint string) {
if entrypoint != "" {
scaleSetListenerEntrypoint = entrypoint
}
}
type resourceBuilder struct{}
func (b *resourceBuilder) newAutoScalingListener(autoscalingRunnerSet *v1alpha1.AutoscalingRunnerSet, ephemeralRunnerSet *v1alpha1.EphemeralRunnerSet, namespace, image string, imagePullSecrets []corev1.LocalObjectReference) (*v1alpha1.AutoscalingListener, error) {
@@ -225,7 +234,7 @@ func (b *resourceBuilder) newScaleSetListenerPod(autoscalingListener *v1alpha1.A
Image: autoscalingListener.Spec.Image,
Env: listenerEnv,
Command: []string{
"/github-runnerscaleset-listener",
scaleSetListenerEntrypoint,
},
Ports: ports,
VolumeMounts: []corev1.VolumeMount{
@@ -300,7 +309,7 @@ func mergeListenerPodWithTemplate(pod *corev1.Pod, tmpl *corev1.PodTemplateSpec)
c := &tmpl.Spec.Containers[i]
switch c.Name {
case "listener":
case autoscalingListenerContainerName:
mergeListenerContainer(listenerContainer, c)
default:
pod.Spec.Containers = append(pod.Spec.Containers, *c)

View File

@@ -1,6 +1,6 @@
# Changing semantics of the `minRunners` field
**Status**: Proposed
**Status**: Accepted
## Context

View File

@@ -43,6 +43,27 @@ You can follow [this troubleshooting guide](https://docs.github.com/en/actions/h
## Changelog
### v0.8.0
1. Change listener container name [#3167](https://github.com/actions/actions-runner-controller/pull/3167)
1. Fix empty env and volumeMounts object on default setup [#3166](https://github.com/actions/actions-runner-controller/pull/3166)
1. Fix override listener pod spec [#3161](https://github.com/actions/actions-runner-controller/pull/3161)
1. Change minRunners behavior and fix the new listener min runners [#3139](https://github.com/actions/actions-runner-controller/pull/3139)
1. Update user agent for new ghalistener [#3138](https://github.com/actions/actions-runner-controller/pull/3138)
1. Bump golang.org/x/oauth2 from 0.14.0 to 0.15.0 [#3127](https://github.com/actions/actions-runner-controller/pull/3127)
1. Bump golang.org.x.net from 0.18.0 to 0.19.0 [#3126](https://github.com/actions/actions-runner-controller/pull/3126)
1. Bump k8s.io/client-go from 0.28.3 to 0.28.4 [#3125](https://github.com/actions/actions-runner-controller/pull/3125)
1. Modify user agent format with subsystem and is proxy configured information [#3116](https://github.com/actions/actions-runner-controller/pull/3116)
1. Record the error when the creation pod fails [#3112](https://github.com/actions/actions-runner-controller/pull/3112)
1. Fix typo in helm chart comment [#3104](https://github.com/actions/actions-runner-controller/pull/3104)
1. Set actions client timeout to 5 minutes, add logging to client [#3103](https://github.com/actions/actions-runner-controller/pull/3103)
1. Refactor listener app with configurable fallback [#3096](https://github.com/actions/actions-runner-controller/pull/3096)
1. Bump github.com/onsi/gomega from 1.29.0 to 1.30.0 [#3094](https://github.com/actions/actions-runner-controller/pull/3094)
1. Bump k8s.io/api from 0.28.3 to 0.28.4 [#3093](https://github.com/actions/actions-runner-controller/pull/3093)
1. Bump k8s.io/apimachinery from 0.28.3 to 0.28.4 [#3092](https://github.com/actions/actions-runner-controller/pull/3092)
1. Bump github.com/gruntwork-io/terratest from 0.41.24 to 0.46.7 [#3091](https://github.com/actions/actions-runner-controller/pull/3091)
1. Record a reason for pod failure in EphemeralRunner [#3074](https://github.com/actions/actions-runner-controller/pull/3074)
1. ADR: Changing semantics of min runners to be min idle runners [#3040](https://github.com/actions/actions-runner-controller/pull/3040)
### v0.7.0
1. Add ResizePolicy and RestartPolicy on mergeListenerContainer [#3075](https://github.com/actions/actions-runner-controller/pull/3075)
1. feat: GHA controller Helm Chart quoted labels [#3061](https://github.com/actions/actions-runner-controller/pull/3061)

View File

@@ -109,23 +109,31 @@ type ProxyFunc func(req *http.Request) (*url.URL, error)
type ClientOption func(*Client)
type UserAgentInfo struct {
Version string
CommitSHA string
// Version is the version of the controller
Version string
// CommitSHA is the git commit SHA of the controller
CommitSHA string
// ScaleSetID is the ID of the scale set
ScaleSetID int
// HasProxy is true if the controller is running behind a proxy
HasProxy bool
// Subsystem is the subsystem such as listener, controller, etc.
// Each system may pick its own subsystem name.
Subsystem string
}
func (u UserAgentInfo) String() string {
var scaleSetID = "NA"
scaleSetID := "NA"
if u.ScaleSetID > 0 {
scaleSetID = strconv.Itoa(u.ScaleSetID)
}
return fmt.Sprintf(
"actions-runner-controller/%s CommitSHA/%s ScaleSetID/%s",
u.Version,
u.CommitSHA,
scaleSetID,
)
proxy := "Proxy/disabled"
if u.HasProxy {
proxy = "Proxy/enabled"
}
return fmt.Sprintf("actions-runner-controller/%s (%s; %s) ScaleSetID/%s (%s)", u.Version, u.CommitSHA, u.Subsystem, scaleSetID, proxy)
}
func WithLogger(logger logr.Logger) ClientOption {

View File

@@ -1,4 +1,4 @@
// Code generated by mockery v2.33.2. DO NOT EDIT.
// Code generated by mockery v2.36.1. DO NOT EDIT.
package actions

View File

@@ -1,4 +1,4 @@
// Code generated by mockery v2.33.2. DO NOT EDIT.
// Code generated by mockery v2.36.1. DO NOT EDIT.
package actions

View File

@@ -0,0 +1,24 @@
package actions_test
import (
"testing"
"github.com/actions/actions-runner-controller/github/actions"
"github.com/stretchr/testify/assert"
)
func TestUserAgentInfoString(t *testing.T) {
userAgentInfo := actions.UserAgentInfo{
Version: "0.1.0",
CommitSHA: "1234567890abcdef",
ScaleSetID: 10,
HasProxy: true,
Subsystem: "test",
}
userAgent := userAgentInfo.String()
expectedProduct := "actions-runner-controller/0.1.0 (1234567890abcdef; test)"
assert.Contains(t, userAgent, expectedProduct)
expectedScaleSet := "ScaleSetID/10 (Proxy/enabled)"
assert.Contains(t, userAgent, expectedScaleSet)
}

14
go.mod
View File

@@ -13,7 +13,7 @@ require (
github.com/google/uuid v1.4.0
github.com/gorilla/mux v1.8.1
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79
github.com/gruntwork-io/terratest v0.41.24
github.com/gruntwork-io/terratest v0.46.7
github.com/hashicorp/go-retryablehttp v0.7.5
github.com/kelseyhightower/envconfig v1.4.0
github.com/onsi/ginkgo v1.16.5
@@ -25,14 +25,14 @@ require (
github.com/teambition/rrule-go v1.8.2
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.26.0
golang.org/x/net v0.18.0
golang.org/x/oauth2 v0.14.0
golang.org/x/net v0.19.0
golang.org/x/oauth2 v0.15.0
golang.org/x/sync v0.5.0
gomodules.xyz/jsonpatch/v2 v2.4.0
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.28.4
k8s.io/apimachinery v0.28.4
k8s.io/client-go v0.28.3
k8s.io/client-go v0.28.4
sigs.k8s.io/controller-runtime v0.16.3
sigs.k8s.io/yaml v1.4.0
)
@@ -89,10 +89,10 @@ require (
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/objx v0.5.1 // indirect
github.com/urfave/cli v1.22.2 // indirect
golang.org/x/crypto v0.15.0 // indirect
golang.org/x/crypto v0.16.0 // indirect
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa // indirect
golang.org/x/sys v0.14.0 // indirect
golang.org/x/term v0.14.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/term v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.4.0 // indirect
golang.org/x/tools v0.15.0 // indirect

28
go.sum
View File

@@ -106,8 +106,8 @@ github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWS
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gruntwork-io/go-commons v0.8.0 h1:k/yypwrPqSeYHevLlEDmvmgQzcyTwrlZGRaxEM6G0ro=
github.com/gruntwork-io/go-commons v0.8.0/go.mod h1:gtp0yTtIBExIZp7vyIV9I0XQkVwiQZze678hvDXof78=
github.com/gruntwork-io/terratest v0.41.24 h1:j6T6qe4deVvynTG2UmnjGwZy83he6xKgTaYWiSdFv/w=
github.com/gruntwork-io/terratest v0.41.24/go.mod h1:O6gajNBjO1wvc7Wl9WtbO+ORcdnhAV2GQiBE71ycwIk=
github.com/gruntwork-io/terratest v0.46.7 h1:oqGPBBO87SEsvBYaA0R5xOq+Lm2Xc5dmFVfxEolfZeU=
github.com/gruntwork-io/terratest v0.46.7/go.mod h1:6gI5MlLeyF+SLwqocA5GBzcTix+XiuxCy1BPwKuT+WM=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ=
@@ -233,8 +233,8 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.3.1-0.20221117191849-2c476679df9a/go.mod h1:hebNnKkNXi2UzZN1eVRvBB7co0a+JxK6XbPiWVs/3J4=
golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU=
golang.org/x/crypto v0.15.0 h1:frVn1TEaCEaZcn3Tmd7Y2b5KKPaZ+I32Q2OA3kYp5TA=
golang.org/x/crypto v0.15.0/go.mod h1:4ChreQoLWfG3xLDer1WdlH5NdlQ3+mwnQq1YTKY+72g=
golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY=
golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa h1:FRnLl4eNAQl8hwxVVC17teOw8kdjVDVAiFMtgUdTSRQ=
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa/go.mod h1:zk2irFbV9DP96SEBUUAy67IdHUaZuSnrz1n472HUCLE=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
@@ -255,10 +255,10 @@ golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug
golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
golang.org/x/net v0.18.0 h1:mIYleuAkSbHh0tCv7RvjL3F6ZVbLjq4+R7zbOn3Kokg=
golang.org/x/net v0.18.0/go.mod h1:/czyP5RqHAH4odGYxBJ1qz0+CE5WZ+2j1YgoEo8F2jQ=
golang.org/x/oauth2 v0.14.0 h1:P0Vrf/2538nmC0H+pEQ3MNFRRnVR7RlqyVw+bvm26z0=
golang.org/x/oauth2 v0.14.0/go.mod h1:lAtNWgaWfL4cm7j2OV8TxGi9Qb7ECORx8DktCY74OwM=
golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c=
golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U=
golang.org/x/oauth2 v0.15.0 h1:s8pnnxNVzjWyrvYdFUQq5llS1PX2zhPXmccZv99h7uQ=
golang.org/x/oauth2 v0.15.0/go.mod h1:q48ptWNTY5XWf+JNten23lcvHpLJ0ZSxF5ttTHKVCAM=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -288,15 +288,15 @@ golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q=
golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U=
golang.org/x/term v0.14.0 h1:LGK9IlZ8T9jvdy6cTdfKUCltatMFOehAQo9SRC46UQ8=
golang.org/x/term v0.14.0/go.mod h1:TySc+nGkYR6qt8km8wUhuFRTVSMIX3XPR58y2lC8vww=
golang.org/x/term v0.15.0 h1:y/Oo/a/q3IXu26lQgl04j/gjuBDOBlx7X6Om1j2CPW4=
golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
@@ -357,8 +357,8 @@ k8s.io/apiextensions-apiserver v0.28.3 h1:Od7DEnhXHnHPZG+W9I97/fSQkVpVPQx2diy+2E
k8s.io/apiextensions-apiserver v0.28.3/go.mod h1:NE1XJZ4On0hS11aWWJUTNkmVB03j9LM7gJSisbRt8Lc=
k8s.io/apimachinery v0.28.4 h1:zOSJe1mc+GxuMnFzD4Z/U1wst50X28ZNsn5bhgIIao8=
k8s.io/apimachinery v0.28.4/go.mod h1:wI37ncBvfAoswfq626yPTe6Bz1c22L7uaJ8dho83mgg=
k8s.io/client-go v0.28.3 h1:2OqNb72ZuTZPKCl+4gTKvqao0AMOl9f3o2ijbAj3LI4=
k8s.io/client-go v0.28.3/go.mod h1:LTykbBp9gsA7SwqirlCXBWtK0guzfhpoW4qSm7i9dxo=
k8s.io/client-go v0.28.4 h1:Np5ocjlZcTrkyRJ3+T3PkXDpe4UpatQxj85+xjaD2wY=
k8s.io/client-go v0.28.4/go.mod h1:0VDZFpgoZfelyP5Wqu0/r/TRYcLYuJ2U1KEeoaPa1N4=
k8s.io/component-base v0.28.3 h1:rDy68eHKxq/80RiMb2Ld/tbH8uAE75JdCqJyi6lXMzI=
k8s.io/component-base v0.28.3/go.mod h1:fDJ6vpVNSk6cRo5wmDa6eKIG7UlIQkaFmZN2fYgIUD8=
k8s.io/klog/v2 v2.110.1 h1:U/Af64HJf7FcwMcXyKm2RPM22WZzyR7OSpYj5tg3cL0=

View File

@@ -203,6 +203,8 @@ func main() {
log.Info("Using default AutoscalingListener logging parameters", "LogLevel", actionsgithubcom.DefaultScaleSetListenerLogLevel, "LogFormat", actionsgithubcom.DefaultScaleSetListenerLogFormat)
}
actionsgithubcom.SetListenerEntrypoint(os.Getenv("LISTENER_ENTRYPOINT"))
var webhookServer webhook.Server
if port != 0 {
webhookServer = webhook.NewServer(webhook.Options{