mirror of
https://github.com/actions/actions-runner-controller.git
synced 2025-12-26 03:27:31 +08:00
chore: goinstrument
This commit is contained in:
@@ -11,6 +11,7 @@ import (
|
||||
"github.com/actions/actions-runner-controller/cmd/ghalistener/worker"
|
||||
"github.com/actions/actions-runner-controller/github/actions"
|
||||
"github.com/go-logr/logr"
|
||||
"go.opentelemetry.io/otel"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
@@ -105,6 +106,9 @@ func New(config config.Config) (*App, error) {
|
||||
}
|
||||
|
||||
func (app *App) Run(ctx context.Context) error {
|
||||
ctx, span := otel.Tracer("arc").Start(ctx, "App.Run")
|
||||
defer span.End()
|
||||
|
||||
var errs []error
|
||||
if app.worker == nil {
|
||||
errs = append(errs, fmt.Errorf("worker not initialized"))
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
|
||||
listener "github.com/actions/actions-runner-controller/cmd/ghalistener/listener"
|
||||
mock "github.com/stretchr/testify/mock"
|
||||
"go.opentelemetry.io/otel"
|
||||
)
|
||||
|
||||
// Listener is an autogenerated mock type for the Listener type
|
||||
@@ -16,6 +17,9 @@ type Listener struct {
|
||||
|
||||
// Listen provides a mock function with given fields: ctx, handler
|
||||
func (_m *Listener) Listen(ctx context.Context, handler listener.Handler) error {
|
||||
ctx, span := otel.Tracer("arc").Start(ctx, "Listener.Listen")
|
||||
defer span.End()
|
||||
|
||||
ret := _m.Called(ctx, handler)
|
||||
|
||||
var r0 error
|
||||
|
||||
@@ -4,6 +4,7 @@ package mocks
|
||||
|
||||
import (
|
||||
actions "github.com/actions/actions-runner-controller/github/actions"
|
||||
"go.opentelemetry.io/otel"
|
||||
|
||||
context "context"
|
||||
|
||||
@@ -17,6 +18,9 @@ type Worker struct {
|
||||
|
||||
// HandleDesiredRunnerCount provides a mock function with given fields: ctx, count, acquireCount
|
||||
func (_m *Worker) HandleDesiredRunnerCount(ctx context.Context, count int, acquireCount int) (int, error) {
|
||||
ctx, span := otel.Tracer("arc").Start(ctx, "Worker.HandleDesiredRunnerCount")
|
||||
defer span.End()
|
||||
|
||||
ret := _m.Called(ctx, count, acquireCount)
|
||||
|
||||
var r0 int
|
||||
@@ -41,6 +45,9 @@ func (_m *Worker) HandleDesiredRunnerCount(ctx context.Context, count int, acqui
|
||||
|
||||
// HandleJobStarted provides a mock function with given fields: ctx, jobInfo
|
||||
func (_m *Worker) HandleJobStarted(ctx context.Context, jobInfo *actions.JobStarted) error {
|
||||
ctx, span := otel.Tracer("arc").Start(ctx, "Worker.HandleJobStarted")
|
||||
defer span.End()
|
||||
|
||||
ret := _m.Called(ctx, jobInfo)
|
||||
|
||||
var r0 error
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"github.com/actions/actions-runner-controller/github/actions"
|
||||
"github.com/go-logr/logr"
|
||||
"github.com/google/uuid"
|
||||
"go.opentelemetry.io/otel"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -125,6 +126,9 @@ type Handler interface {
|
||||
// The handler is responsible for handling the initial message and subsequent messages.
|
||||
// If an error occurs during any step, Listen returns an error.
|
||||
func (l *Listener) Listen(ctx context.Context, handler Handler) error {
|
||||
ctx, span := otel.Tracer("arc").Start(ctx, "Listener.Listen")
|
||||
defer span.End()
|
||||
|
||||
if err := l.createSession(ctx); err != nil {
|
||||
return fmt.Errorf("createSession failed: %w", err)
|
||||
}
|
||||
@@ -182,6 +186,9 @@ func (l *Listener) Listen(ctx context.Context, handler Handler) error {
|
||||
}
|
||||
|
||||
func (l *Listener) handleMessage(ctx context.Context, handler Handler, msg *actions.RunnerScaleSetMessage) error {
|
||||
ctx, span := otel.Tracer("arc").Start(ctx, "Listener.handleMessage")
|
||||
defer span.End()
|
||||
|
||||
parsedMsg, err := l.parseMessage(ctx, msg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to parse message: %w", err)
|
||||
@@ -223,6 +230,9 @@ func (l *Listener) handleMessage(ctx context.Context, handler Handler, msg *acti
|
||||
}
|
||||
|
||||
func (l *Listener) createSession(ctx context.Context) error {
|
||||
ctx, span := otel.Tracer("arc").Start(ctx, "Listener.createSession")
|
||||
defer span.End()
|
||||
|
||||
var session *actions.RunnerScaleSetSession
|
||||
var retries int
|
||||
|
||||
@@ -268,6 +278,9 @@ func (l *Listener) createSession(ctx context.Context) error {
|
||||
}
|
||||
|
||||
func (l *Listener) getMessage(ctx context.Context) (*actions.RunnerScaleSetMessage, error) {
|
||||
ctx, span := otel.Tracer("arc").Start(ctx, "Listener.getMessage")
|
||||
defer span.End()
|
||||
|
||||
l.logger.Info("Getting next message", "lastMessageID", l.lastMessageID)
|
||||
msg, err := l.client.GetMessage(ctx, l.session.MessageQueueUrl, l.session.MessageQueueAccessToken, l.lastMessageID, l.maxCapacity)
|
||||
if err == nil { // if NO error
|
||||
@@ -294,6 +307,9 @@ func (l *Listener) getMessage(ctx context.Context) (*actions.RunnerScaleSetMessa
|
||||
}
|
||||
|
||||
func (l *Listener) deleteLastMessage(ctx context.Context) error {
|
||||
ctx, span := otel.Tracer("arc").Start(ctx, "Listener.deleteLastMessage")
|
||||
defer span.End()
|
||||
|
||||
l.logger.Info("Deleting last message", "lastMessageID", l.lastMessageID)
|
||||
err := l.client.DeleteMessage(ctx, l.session.MessageQueueUrl, l.session.MessageQueueAccessToken, l.lastMessageID)
|
||||
if err == nil { // if NO error
|
||||
@@ -325,6 +341,9 @@ type parsedMessage struct {
|
||||
}
|
||||
|
||||
func (l *Listener) parseMessage(ctx context.Context, msg *actions.RunnerScaleSetMessage) (*parsedMessage, error) {
|
||||
ctx, span := otel.Tracer("arc").Start(ctx, "Listener.parseMessage")
|
||||
defer span.End()
|
||||
|
||||
if msg.MessageType != "RunnerScaleSetJobMessages" {
|
||||
l.logger.Info("Skipping message", "messageType", msg.MessageType)
|
||||
return nil, fmt.Errorf("invalid message type: %s", msg.MessageType)
|
||||
@@ -398,6 +417,9 @@ func (l *Listener) parseMessage(ctx context.Context, msg *actions.RunnerScaleSet
|
||||
}
|
||||
|
||||
func (l *Listener) acquireAvailableJobs(ctx context.Context, jobsAvailable []*actions.JobAvailable) ([]int64, error) {
|
||||
ctx, span := otel.Tracer("arc").Start(ctx, "Listener.acquireAvailableJobs")
|
||||
defer span.End()
|
||||
|
||||
ids := make([]int64, 0, len(jobsAvailable))
|
||||
for _, job := range jobsAvailable {
|
||||
ids = append(ids, job.RunnerRequestId)
|
||||
@@ -428,6 +450,9 @@ func (l *Listener) acquireAvailableJobs(ctx context.Context, jobsAvailable []*ac
|
||||
}
|
||||
|
||||
func (l *Listener) refreshSession(ctx context.Context) error {
|
||||
ctx, span := otel.Tracer("arc").Start(ctx, "Listener.refreshSession")
|
||||
defer span.End()
|
||||
|
||||
l.logger.Info("Message queue token is expired during GetNextMessage, refreshing...")
|
||||
session, err := l.client.RefreshMessageSession(ctx, l.session.RunnerScaleSet.Id, l.session.SessionId)
|
||||
if err != nil {
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
context "context"
|
||||
|
||||
actions "github.com/actions/actions-runner-controller/github/actions"
|
||||
"go.opentelemetry.io/otel"
|
||||
|
||||
mock "github.com/stretchr/testify/mock"
|
||||
|
||||
@@ -19,6 +20,9 @@ type Client struct {
|
||||
|
||||
// AcquireJobs provides a mock function with given fields: ctx, runnerScaleSetId, messageQueueAccessToken, requestIds
|
||||
func (_m *Client) AcquireJobs(ctx context.Context, runnerScaleSetId int, messageQueueAccessToken string, requestIds []int64) ([]int64, error) {
|
||||
ctx, span := otel.Tracer("arc").Start(ctx, "Client.AcquireJobs")
|
||||
defer span.End()
|
||||
|
||||
ret := _m.Called(ctx, runnerScaleSetId, messageQueueAccessToken, requestIds)
|
||||
|
||||
var r0 []int64
|
||||
@@ -45,6 +49,9 @@ func (_m *Client) AcquireJobs(ctx context.Context, runnerScaleSetId int, message
|
||||
|
||||
// CreateMessageSession provides a mock function with given fields: ctx, runnerScaleSetId, owner
|
||||
func (_m *Client) CreateMessageSession(ctx context.Context, runnerScaleSetId int, owner string) (*actions.RunnerScaleSetSession, error) {
|
||||
ctx, span := otel.Tracer("arc").Start(ctx, "Client.CreateMessageSession")
|
||||
defer span.End()
|
||||
|
||||
ret := _m.Called(ctx, runnerScaleSetId, owner)
|
||||
|
||||
var r0 *actions.RunnerScaleSetSession
|
||||
@@ -71,6 +78,9 @@ func (_m *Client) CreateMessageSession(ctx context.Context, runnerScaleSetId int
|
||||
|
||||
// DeleteMessage provides a mock function with given fields: ctx, messageQueueUrl, messageQueueAccessToken, messageId
|
||||
func (_m *Client) DeleteMessage(ctx context.Context, messageQueueUrl string, messageQueueAccessToken string, messageId int64) error {
|
||||
ctx, span := otel.Tracer("arc").Start(ctx, "Client.DeleteMessage")
|
||||
defer span.End()
|
||||
|
||||
ret := _m.Called(ctx, messageQueueUrl, messageQueueAccessToken, messageId)
|
||||
|
||||
var r0 error
|
||||
@@ -85,6 +95,9 @@ func (_m *Client) DeleteMessage(ctx context.Context, messageQueueUrl string, mes
|
||||
|
||||
// DeleteMessageSession provides a mock function with given fields: ctx, runnerScaleSetId, sessionId
|
||||
func (_m *Client) DeleteMessageSession(ctx context.Context, runnerScaleSetId int, sessionId *uuid.UUID) error {
|
||||
ctx, span := otel.Tracer("arc").Start(ctx, "Client.DeleteMessageSession")
|
||||
defer span.End()
|
||||
|
||||
ret := _m.Called(ctx, runnerScaleSetId, sessionId)
|
||||
|
||||
var r0 error
|
||||
@@ -99,6 +112,9 @@ func (_m *Client) DeleteMessageSession(ctx context.Context, runnerScaleSetId int
|
||||
|
||||
// GetAcquirableJobs provides a mock function with given fields: ctx, runnerScaleSetId
|
||||
func (_m *Client) GetAcquirableJobs(ctx context.Context, runnerScaleSetId int) (*actions.AcquirableJobList, error) {
|
||||
ctx, span := otel.Tracer("arc").Start(ctx, "Client.GetAcquirableJobs")
|
||||
defer span.End()
|
||||
|
||||
ret := _m.Called(ctx, runnerScaleSetId)
|
||||
|
||||
var r0 *actions.AcquirableJobList
|
||||
@@ -125,6 +141,9 @@ func (_m *Client) GetAcquirableJobs(ctx context.Context, runnerScaleSetId int) (
|
||||
|
||||
// GetMessage provides a mock function with given fields: ctx, messageQueueUrl, messageQueueAccessToken, lastMessageId, maxCapacity
|
||||
func (_m *Client) GetMessage(ctx context.Context, messageQueueUrl string, messageQueueAccessToken string, lastMessageId int64, maxCapacity int) (*actions.RunnerScaleSetMessage, error) {
|
||||
ctx, span := otel.Tracer("arc").Start(ctx, "Client.GetMessage")
|
||||
defer span.End()
|
||||
|
||||
ret := _m.Called(ctx, messageQueueUrl, messageQueueAccessToken, lastMessageId, maxCapacity)
|
||||
|
||||
var r0 *actions.RunnerScaleSetMessage
|
||||
@@ -151,6 +170,9 @@ func (_m *Client) GetMessage(ctx context.Context, messageQueueUrl string, messag
|
||||
|
||||
// RefreshMessageSession provides a mock function with given fields: ctx, runnerScaleSetId, sessionId
|
||||
func (_m *Client) RefreshMessageSession(ctx context.Context, runnerScaleSetId int, sessionId *uuid.UUID) (*actions.RunnerScaleSetSession, error) {
|
||||
ctx, span := otel.Tracer("arc").Start(ctx, "Client.RefreshMessageSession")
|
||||
defer span.End()
|
||||
|
||||
ret := _m.Called(ctx, runnerScaleSetId, sessionId)
|
||||
|
||||
var r0 *actions.RunnerScaleSetSession
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
context "context"
|
||||
|
||||
actions "github.com/actions/actions-runner-controller/github/actions"
|
||||
"go.opentelemetry.io/otel"
|
||||
|
||||
mock "github.com/stretchr/testify/mock"
|
||||
)
|
||||
@@ -17,6 +18,9 @@ type Handler struct {
|
||||
|
||||
// HandleDesiredRunnerCount provides a mock function with given fields: ctx, count, jobsCompleted
|
||||
func (_m *Handler) HandleDesiredRunnerCount(ctx context.Context, count int, jobsCompleted int) (int, error) {
|
||||
ctx, span := otel.Tracer("arc").Start(ctx, "Handler.HandleDesiredRunnerCount")
|
||||
defer span.End()
|
||||
|
||||
ret := _m.Called(ctx, count, jobsCompleted)
|
||||
|
||||
var r0 int
|
||||
@@ -41,6 +45,9 @@ func (_m *Handler) HandleDesiredRunnerCount(ctx context.Context, count int, jobs
|
||||
|
||||
// HandleJobStarted provides a mock function with given fields: ctx, jobInfo
|
||||
func (_m *Handler) HandleJobStarted(ctx context.Context, jobInfo *actions.JobStarted) error {
|
||||
ctx, span := otel.Tracer("arc").Start(ctx, "Handler.HandleJobStarted")
|
||||
defer span.End()
|
||||
|
||||
ret := _m.Called(ctx, jobInfo)
|
||||
|
||||
var r0 error
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"github.com/go-logr/logr"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"go.opentelemetry.io/otel"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -336,6 +337,9 @@ func NewExporter(config ExporterConfig) ServerPublisher {
|
||||
}
|
||||
|
||||
func (e *exporter) ListenAndServe(ctx context.Context) error {
|
||||
ctx, span := otel.Tracer("arc").Start(ctx, "exporter.ListenAndServe")
|
||||
defer span.End()
|
||||
|
||||
e.logger.Info("starting metrics server", "addr", e.srv.Addr)
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
context "context"
|
||||
|
||||
actions "github.com/actions/actions-runner-controller/github/actions"
|
||||
"go.opentelemetry.io/otel"
|
||||
|
||||
mock "github.com/stretchr/testify/mock"
|
||||
)
|
||||
@@ -17,6 +18,9 @@ type ServerPublisher struct {
|
||||
|
||||
// ListenAndServe provides a mock function with given fields: ctx
|
||||
func (_m *ServerPublisher) ListenAndServe(ctx context.Context) error {
|
||||
ctx, span := otel.Tracer("arc").Start(ctx, "ServerPublisher.ListenAndServe")
|
||||
defer span.End()
|
||||
|
||||
ret := _m.Called(ctx)
|
||||
|
||||
var r0 error
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"github.com/actions/actions-runner-controller/logging"
|
||||
jsonpatch "github.com/evanphx/json-patch"
|
||||
"github.com/go-logr/logr"
|
||||
"go.opentelemetry.io/otel"
|
||||
kerrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
@@ -96,6 +97,9 @@ func (w *Worker) applyDefaults() error {
|
||||
// about the ephemeral runner that should not be deleted when scaling down.
|
||||
// It returns an error if there is any issue with updating the job information.
|
||||
func (w *Worker) HandleJobStarted(ctx context.Context, jobInfo *actions.JobStarted) error {
|
||||
ctx, span := otel.Tracer("arc").Start(ctx, "Worker.HandleJobStarted")
|
||||
defer span.End()
|
||||
|
||||
w.logger.Info("Updating job info for the runner",
|
||||
"runnerName", jobInfo.RunnerName,
|
||||
"ownerName", jobInfo.OwnerName,
|
||||
@@ -164,6 +168,9 @@ func (w *Worker) HandleJobStarted(ctx context.Context, jobInfo *actions.JobStart
|
||||
// Finally, it logs the scaled ephemeral runner set details and returns nil if successful.
|
||||
// If any error occurs during the process, it returns an error with a descriptive message.
|
||||
func (w *Worker) HandleDesiredRunnerCount(ctx context.Context, count, jobsCompleted int) (int, error) {
|
||||
ctx, span := otel.Tracer("arc").Start(ctx, "Worker.HandleDesiredRunnerCount")
|
||||
defer span.End()
|
||||
|
||||
patchID := w.setDesiredWorkerState(count, jobsCompleted)
|
||||
|
||||
original, err := json.Marshal(
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/actions/actions-runner-controller/apis/actions.github.com/v1alpha1"
|
||||
jsonpatch "github.com/evanphx/json-patch"
|
||||
"github.com/go-logr/logr"
|
||||
"go.opentelemetry.io/otel"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/rest"
|
||||
@@ -38,6 +39,9 @@ func NewKubernetesManager(logger *logr.Logger) (*AutoScalerKubernetesManager, er
|
||||
}
|
||||
|
||||
func (k *AutoScalerKubernetesManager) ScaleEphemeralRunnerSet(ctx context.Context, namespace, resourceName string, runnerCount int) error {
|
||||
ctx, span := otel.Tracer("arc").Start(ctx, "AutoScalerKubernetesManager.ScaleEphemeralRunnerSet")
|
||||
defer span.End()
|
||||
|
||||
original := &v1alpha1.EphemeralRunnerSet{
|
||||
Spec: v1alpha1.EphemeralRunnerSetSpec{
|
||||
Replicas: -1,
|
||||
@@ -83,6 +87,9 @@ func (k *AutoScalerKubernetesManager) ScaleEphemeralRunnerSet(ctx context.Contex
|
||||
}
|
||||
|
||||
func (k *AutoScalerKubernetesManager) UpdateEphemeralRunnerWithJobInfo(ctx context.Context, namespace, resourceName, ownerName, repositoryName, jobWorkflowRef, jobDisplayName string, workflowRunId, jobRequestId int64) error {
|
||||
ctx, span := otel.Tracer("arc").Start(ctx, "AutoScalerKubernetesManager.UpdateEphemeralRunnerWithJobInfo")
|
||||
defer span.End()
|
||||
|
||||
original := &v1alpha1.EphemeralRunner{}
|
||||
originalJson, err := json.Marshal(original)
|
||||
if err != nil {
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"github.com/go-logr/logr"
|
||||
"github.com/google/uuid"
|
||||
"github.com/pkg/errors"
|
||||
"go.opentelemetry.io/otel"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -38,6 +39,9 @@ func NewAutoScalerClient(
|
||||
runnerScaleSetId int,
|
||||
options ...func(*AutoScalerClient),
|
||||
) (*AutoScalerClient, error) {
|
||||
ctx, span := otel.Tracer("arc").Start(ctx, "NewAutoScalerClient")
|
||||
defer span.End()
|
||||
|
||||
listener := AutoScalerClient{
|
||||
logger: logger.WithName("auto_scaler"),
|
||||
}
|
||||
@@ -59,6 +63,9 @@ func NewAutoScalerClient(
|
||||
}
|
||||
|
||||
func createSession(ctx context.Context, logger *logr.Logger, client actions.ActionsService, runnerScaleSetId int) (*actions.RunnerScaleSetSession, *actions.RunnerScaleSetMessage, error) {
|
||||
ctx, span := otel.Tracer("arc").Start(ctx, "createSession")
|
||||
defer span.End()
|
||||
|
||||
hostName, err := os.Hostname()
|
||||
if err != nil {
|
||||
hostName = uuid.New().String()
|
||||
@@ -130,6 +137,9 @@ func (m *AutoScalerClient) Close() error {
|
||||
}
|
||||
|
||||
func (m *AutoScalerClient) GetRunnerScaleSetMessage(ctx context.Context, handler func(msg *actions.RunnerScaleSetMessage) error, maxCapacity int) error {
|
||||
ctx, span := otel.Tracer("arc").Start(ctx, "AutoScalerClient.GetRunnerScaleSetMessage")
|
||||
defer span.End()
|
||||
|
||||
if m.initialMessage != nil {
|
||||
err := handler(m.initialMessage)
|
||||
if err != nil {
|
||||
@@ -162,6 +172,9 @@ func (m *AutoScalerClient) GetRunnerScaleSetMessage(ctx context.Context, handler
|
||||
}
|
||||
|
||||
func (m *AutoScalerClient) deleteMessage(ctx context.Context, messageId int64) error {
|
||||
ctx, span := otel.Tracer("arc").Start(ctx, "AutoScalerClient.deleteMessage")
|
||||
defer span.End()
|
||||
|
||||
err := m.client.DeleteMessage(ctx, messageId)
|
||||
if err != nil {
|
||||
return fmt.Errorf("delete message failed from refreshing client. %w", err)
|
||||
@@ -172,6 +185,9 @@ func (m *AutoScalerClient) deleteMessage(ctx context.Context, messageId int64) e
|
||||
}
|
||||
|
||||
func (m *AutoScalerClient) AcquireJobsForRunnerScaleSet(ctx context.Context, requestIds []int64) error {
|
||||
ctx, span := otel.Tracer("arc").Start(ctx, "AutoScalerClient.AcquireJobsForRunnerScaleSet")
|
||||
defer span.End()
|
||||
|
||||
m.logger.Info("acquiring jobs.", "request count", len(requestIds), "requestIds", fmt.Sprint(requestIds))
|
||||
if len(requestIds) == 0 {
|
||||
return nil
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"github.com/actions/actions-runner-controller/cmd/githubrunnerscalesetlistener/config"
|
||||
"github.com/actions/actions-runner-controller/github/actions"
|
||||
"github.com/go-logr/logr"
|
||||
"go.opentelemetry.io/otel"
|
||||
)
|
||||
|
||||
type ScaleSettings struct {
|
||||
@@ -60,6 +61,9 @@ func NewService(
|
||||
settings *ScaleSettings,
|
||||
options ...func(*Service),
|
||||
) (*Service, error) {
|
||||
ctx, span := otel.Tracer("arc").Start(ctx, "NewService")
|
||||
defer span.End()
|
||||
|
||||
s := &Service{
|
||||
ctx: ctx,
|
||||
rsClient: rsClient,
|
||||
|
||||
@@ -34,6 +34,7 @@ import (
|
||||
"github.com/go-logr/logr"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"go.opentelemetry.io/otel"
|
||||
"golang.org/x/net/http/httpproxy"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
@@ -155,6 +156,9 @@ type runOptions struct {
|
||||
}
|
||||
|
||||
func run(ctx context.Context, rc config.Config, logger logr.Logger, opts runOptions) error {
|
||||
ctx, span := otel.Tracer("arc").Start(ctx, "run")
|
||||
defer span.End()
|
||||
|
||||
// Create root context and hook with sigint and sigterm
|
||||
creds := &actions.ActionsAuth{}
|
||||
if rc.Token != "" {
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
context "context"
|
||||
|
||||
mock "github.com/stretchr/testify/mock"
|
||||
"go.opentelemetry.io/otel"
|
||||
)
|
||||
|
||||
// MockKubernetesManager is an autogenerated mock type for the KubernetesManager type
|
||||
@@ -15,6 +16,9 @@ type MockKubernetesManager struct {
|
||||
|
||||
// ScaleEphemeralRunnerSet provides a mock function with given fields: ctx, namespace, resourceName, runnerCount
|
||||
func (_m *MockKubernetesManager) ScaleEphemeralRunnerSet(ctx context.Context, namespace string, resourceName string, runnerCount int) error {
|
||||
ctx, span := otel.Tracer("arc").Start(ctx, "MockKubernetesManager.ScaleEphemeralRunnerSet")
|
||||
defer span.End()
|
||||
|
||||
ret := _m.Called(ctx, namespace, resourceName, runnerCount)
|
||||
|
||||
var r0 error
|
||||
@@ -29,6 +33,9 @@ func (_m *MockKubernetesManager) ScaleEphemeralRunnerSet(ctx context.Context, na
|
||||
|
||||
// UpdateEphemeralRunnerWithJobInfo provides a mock function with given fields: ctx, namespace, resourceName, ownerName, repositoryName, jobWorkflowRef, jobDisplayName, jobRequestId, workflowRunId
|
||||
func (_m *MockKubernetesManager) UpdateEphemeralRunnerWithJobInfo(ctx context.Context, namespace string, resourceName string, ownerName string, repositoryName string, jobWorkflowRef string, jobDisplayName string, jobRequestId int64, workflowRunId int64) error {
|
||||
ctx, span := otel.Tracer("arc").Start(ctx, "MockKubernetesManager.UpdateEphemeralRunnerWithJobInfo")
|
||||
defer span.End()
|
||||
|
||||
ret := _m.Called(ctx, namespace, resourceName, ownerName, repositoryName, jobWorkflowRef, jobDisplayName, jobRequestId, workflowRunId)
|
||||
|
||||
var r0 error
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
context "context"
|
||||
|
||||
actions "github.com/actions/actions-runner-controller/github/actions"
|
||||
"go.opentelemetry.io/otel"
|
||||
|
||||
mock "github.com/stretchr/testify/mock"
|
||||
)
|
||||
@@ -17,6 +18,9 @@ type MockRunnerScaleSetClient struct {
|
||||
|
||||
// AcquireJobsForRunnerScaleSet provides a mock function with given fields: ctx, requestIds
|
||||
func (_m *MockRunnerScaleSetClient) AcquireJobsForRunnerScaleSet(ctx context.Context, requestIds []int64) error {
|
||||
ctx, span := otel.Tracer("arc").Start(ctx, "MockRunnerScaleSetClient.AcquireJobsForRunnerScaleSet")
|
||||
defer span.End()
|
||||
|
||||
ret := _m.Called(ctx, requestIds)
|
||||
|
||||
var r0 error
|
||||
@@ -31,6 +35,9 @@ func (_m *MockRunnerScaleSetClient) AcquireJobsForRunnerScaleSet(ctx context.Con
|
||||
|
||||
// GetRunnerScaleSetMessage provides a mock function with given fields: ctx, handler, maxCapacity
|
||||
func (_m *MockRunnerScaleSetClient) GetRunnerScaleSetMessage(ctx context.Context, handler func(*actions.RunnerScaleSetMessage) error, maxCapacity int) error {
|
||||
ctx, span := otel.Tracer("arc").Start(ctx, "MockRunnerScaleSetClient.GetRunnerScaleSetMessage")
|
||||
defer span.End()
|
||||
|
||||
ret := _m.Called(ctx, handler, maxCapacity)
|
||||
|
||||
var r0 error
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/actions/actions-runner-controller/github/actions"
|
||||
"github.com/go-logr/logr"
|
||||
"github.com/pkg/errors"
|
||||
"go.opentelemetry.io/otel"
|
||||
)
|
||||
|
||||
type SessionRefreshingClient struct {
|
||||
@@ -25,6 +26,9 @@ func newSessionClient(client actions.ActionsService, logger *logr.Logger, sessio
|
||||
}
|
||||
|
||||
func (m *SessionRefreshingClient) GetMessage(ctx context.Context, lastMessageId int64, maxCapacity int) (*actions.RunnerScaleSetMessage, error) {
|
||||
ctx, span := otel.Tracer("arc").Start(ctx, "SessionRefreshingClient.GetMessage")
|
||||
defer span.End()
|
||||
|
||||
if maxCapacity < 0 {
|
||||
return nil, fmt.Errorf("maxCapacity must be greater than or equal to 0")
|
||||
}
|
||||
@@ -55,6 +59,9 @@ func (m *SessionRefreshingClient) GetMessage(ctx context.Context, lastMessageId
|
||||
}
|
||||
|
||||
func (m *SessionRefreshingClient) DeleteMessage(ctx context.Context, messageId int64) error {
|
||||
ctx, span := otel.Tracer("arc").Start(ctx, "SessionRefreshingClient.DeleteMessage")
|
||||
defer span.End()
|
||||
|
||||
err := m.client.DeleteMessage(ctx, m.session.MessageQueueUrl, m.session.MessageQueueAccessToken, messageId)
|
||||
if err == nil {
|
||||
return nil
|
||||
@@ -82,6 +89,9 @@ func (m *SessionRefreshingClient) DeleteMessage(ctx context.Context, messageId i
|
||||
}
|
||||
|
||||
func (m *SessionRefreshingClient) AcquireJobs(ctx context.Context, requestIds []int64) ([]int64, error) {
|
||||
ctx, span := otel.Tracer("arc").Start(ctx, "SessionRefreshingClient.AcquireJobs")
|
||||
defer span.End()
|
||||
|
||||
ids, err := m.client.AcquireJobs(ctx, m.session.RunnerScaleSet.Id, m.session.MessageQueueAccessToken, requestIds)
|
||||
if err == nil {
|
||||
return ids, nil
|
||||
|
||||
Reference in New Issue
Block a user