mirror of
https://github.com/actions/actions-runner-controller.git
synced 2025-12-15 06:26:57 +00:00
Propagate max capacity information to the actions back-end (#3431)
This commit is contained in:
@@ -31,7 +31,7 @@ const (
|
||||
type Client interface {
|
||||
GetAcquirableJobs(ctx context.Context, runnerScaleSetId int) (*actions.AcquirableJobList, error)
|
||||
CreateMessageSession(ctx context.Context, runnerScaleSetId int, owner string) (*actions.RunnerScaleSetSession, error)
|
||||
GetMessage(ctx context.Context, messageQueueUrl, messageQueueAccessToken string, lastMessageId int64) (*actions.RunnerScaleSetMessage, error)
|
||||
GetMessage(ctx context.Context, messageQueueUrl, messageQueueAccessToken string, lastMessageId int64, maxCapacity int) (*actions.RunnerScaleSetMessage, error)
|
||||
DeleteMessage(ctx context.Context, messageQueueUrl, messageQueueAccessToken string, messageId int64) error
|
||||
AcquireJobs(ctx context.Context, runnerScaleSetId int, messageQueueAccessToken string, requestIds []int64) ([]int64, error)
|
||||
RefreshMessageSession(ctx context.Context, runnerScaleSetId int, sessionId *uuid.UUID) (*actions.RunnerScaleSetSession, error)
|
||||
@@ -80,6 +80,7 @@ type Listener struct {
|
||||
|
||||
// updated fields
|
||||
lastMessageID int64 // The ID of the last processed message.
|
||||
maxCapacity int // The maximum number of runners that can be created.
|
||||
session *actions.RunnerScaleSetSession // The session for managing the runner scale set.
|
||||
}
|
||||
|
||||
@@ -89,10 +90,11 @@ func New(config Config) (*Listener, error) {
|
||||
}
|
||||
|
||||
listener := &Listener{
|
||||
scaleSetID: config.ScaleSetID,
|
||||
client: config.Client,
|
||||
logger: config.Logger,
|
||||
metrics: metrics.Discard,
|
||||
scaleSetID: config.ScaleSetID,
|
||||
client: config.Client,
|
||||
logger: config.Logger,
|
||||
metrics: metrics.Discard,
|
||||
maxCapacity: config.MaxRunners,
|
||||
}
|
||||
|
||||
if config.Metrics != nil {
|
||||
@@ -267,7 +269,7 @@ func (l *Listener) createSession(ctx context.Context) error {
|
||||
|
||||
func (l *Listener) getMessage(ctx context.Context) (*actions.RunnerScaleSetMessage, error) {
|
||||
l.logger.Info("Getting next message", "lastMessageID", l.lastMessageID)
|
||||
msg, err := l.client.GetMessage(ctx, l.session.MessageQueueUrl, l.session.MessageQueueAccessToken, l.lastMessageID)
|
||||
msg, err := l.client.GetMessage(ctx, l.session.MessageQueueUrl, l.session.MessageQueueAccessToken, l.lastMessageID, l.maxCapacity)
|
||||
if err == nil { // if NO error
|
||||
return msg, nil
|
||||
}
|
||||
@@ -283,7 +285,7 @@ func (l *Listener) getMessage(ctx context.Context) (*actions.RunnerScaleSetMessa
|
||||
|
||||
l.logger.Info("Getting next message", "lastMessageID", l.lastMessageID)
|
||||
|
||||
msg, err = l.client.GetMessage(ctx, l.session.MessageQueueUrl, l.session.MessageQueueAccessToken, l.lastMessageID)
|
||||
msg, err = l.client.GetMessage(ctx, l.session.MessageQueueUrl, l.session.MessageQueueAccessToken, l.lastMessageID, l.maxCapacity)
|
||||
if err != nil { // if NO error
|
||||
return nil, fmt.Errorf("failed to get next message after message session refresh: %w", err)
|
||||
}
|
||||
|
||||
@@ -123,13 +123,14 @@ func TestListener_getMessage(t *testing.T) {
|
||||
config := Config{
|
||||
ScaleSetID: 1,
|
||||
Metrics: metrics.Discard,
|
||||
MaxRunners: 10,
|
||||
}
|
||||
|
||||
client := listenermocks.NewClient(t)
|
||||
want := &actions.RunnerScaleSetMessage{
|
||||
MessageId: 1,
|
||||
}
|
||||
client.On("GetMessage", ctx, mock.Anything, mock.Anything, mock.Anything).Return(want, nil).Once()
|
||||
client.On("GetMessage", ctx, mock.Anything, mock.Anything, mock.Anything, 10).Return(want, nil).Once()
|
||||
config.Client = client
|
||||
|
||||
l, err := New(config)
|
||||
@@ -148,10 +149,11 @@ func TestListener_getMessage(t *testing.T) {
|
||||
config := Config{
|
||||
ScaleSetID: 1,
|
||||
Metrics: metrics.Discard,
|
||||
MaxRunners: 10,
|
||||
}
|
||||
|
||||
client := listenermocks.NewClient(t)
|
||||
client.On("GetMessage", ctx, mock.Anything, mock.Anything, mock.Anything).Return(nil, &actions.HttpClientSideError{Code: http.StatusNotFound}).Once()
|
||||
client.On("GetMessage", ctx, mock.Anything, mock.Anything, mock.Anything, 10).Return(nil, &actions.HttpClientSideError{Code: http.StatusNotFound}).Once()
|
||||
config.Client = client
|
||||
|
||||
l, err := New(config)
|
||||
@@ -170,6 +172,7 @@ func TestListener_getMessage(t *testing.T) {
|
||||
config := Config{
|
||||
ScaleSetID: 1,
|
||||
Metrics: metrics.Discard,
|
||||
MaxRunners: 10,
|
||||
}
|
||||
|
||||
client := listenermocks.NewClient(t)
|
||||
@@ -185,12 +188,12 @@ func TestListener_getMessage(t *testing.T) {
|
||||
}
|
||||
client.On("RefreshMessageSession", ctx, mock.Anything, mock.Anything).Return(session, nil).Once()
|
||||
|
||||
client.On("GetMessage", ctx, mock.Anything, mock.Anything, mock.Anything).Return(nil, &actions.MessageQueueTokenExpiredError{}).Once()
|
||||
client.On("GetMessage", ctx, mock.Anything, mock.Anything, mock.Anything, 10).Return(nil, &actions.MessageQueueTokenExpiredError{}).Once()
|
||||
|
||||
want := &actions.RunnerScaleSetMessage{
|
||||
MessageId: 1,
|
||||
}
|
||||
client.On("GetMessage", ctx, mock.Anything, mock.Anything, mock.Anything).Return(want, nil).Once()
|
||||
client.On("GetMessage", ctx, mock.Anything, mock.Anything, mock.Anything, 10).Return(want, nil).Once()
|
||||
|
||||
config.Client = client
|
||||
|
||||
@@ -214,6 +217,7 @@ func TestListener_getMessage(t *testing.T) {
|
||||
config := Config{
|
||||
ScaleSetID: 1,
|
||||
Metrics: metrics.Discard,
|
||||
MaxRunners: 10,
|
||||
}
|
||||
|
||||
client := listenermocks.NewClient(t)
|
||||
@@ -229,7 +233,7 @@ func TestListener_getMessage(t *testing.T) {
|
||||
}
|
||||
client.On("RefreshMessageSession", ctx, mock.Anything, mock.Anything).Return(session, nil).Once()
|
||||
|
||||
client.On("GetMessage", ctx, mock.Anything, mock.Anything, mock.Anything).Return(nil, &actions.MessageQueueTokenExpiredError{}).Twice()
|
||||
client.On("GetMessage", ctx, mock.Anything, mock.Anything, mock.Anything, 10).Return(nil, &actions.MessageQueueTokenExpiredError{}).Twice()
|
||||
|
||||
config.Client = client
|
||||
|
||||
@@ -450,6 +454,7 @@ func TestListener_Listen(t *testing.T) {
|
||||
config := Config{
|
||||
ScaleSetID: 1,
|
||||
Metrics: metrics.Discard,
|
||||
MaxRunners: 10,
|
||||
}
|
||||
|
||||
client := listenermocks.NewClient(t)
|
||||
@@ -470,7 +475,7 @@ func TestListener_Listen(t *testing.T) {
|
||||
MessageType: "RunnerScaleSetJobMessages",
|
||||
Statistics: &actions.RunnerScaleSetStatistic{},
|
||||
}
|
||||
client.On("GetMessage", ctx, mock.Anything, mock.Anything, mock.Anything).
|
||||
client.On("GetMessage", ctx, mock.Anything, mock.Anything, mock.Anything, 10).
|
||||
Return(msg, nil).
|
||||
Run(
|
||||
func(mock.Arguments) {
|
||||
|
||||
@@ -123,25 +123,25 @@ func (_m *Client) GetAcquirableJobs(ctx context.Context, runnerScaleSetId int) (
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// GetMessage provides a mock function with given fields: ctx, messageQueueUrl, messageQueueAccessToken, lastMessageId
|
||||
func (_m *Client) GetMessage(ctx context.Context, messageQueueUrl string, messageQueueAccessToken string, lastMessageId int64) (*actions.RunnerScaleSetMessage, error) {
|
||||
ret := _m.Called(ctx, messageQueueUrl, messageQueueAccessToken, lastMessageId)
|
||||
// GetMessage provides a mock function with given fields: ctx, messageQueueUrl, messageQueueAccessToken, lastMessageId, maxCapacity
|
||||
func (_m *Client) GetMessage(ctx context.Context, messageQueueUrl string, messageQueueAccessToken string, lastMessageId int64, maxCapacity int) (*actions.RunnerScaleSetMessage, error) {
|
||||
ret := _m.Called(ctx, messageQueueUrl, messageQueueAccessToken, lastMessageId, maxCapacity)
|
||||
|
||||
var r0 *actions.RunnerScaleSetMessage
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, string, string, int64) (*actions.RunnerScaleSetMessage, error)); ok {
|
||||
return rf(ctx, messageQueueUrl, messageQueueAccessToken, lastMessageId)
|
||||
if rf, ok := ret.Get(0).(func(context.Context, string, string, int64, int) (*actions.RunnerScaleSetMessage, error)); ok {
|
||||
return rf(ctx, messageQueueUrl, messageQueueAccessToken, lastMessageId, maxCapacity)
|
||||
}
|
||||
if rf, ok := ret.Get(0).(func(context.Context, string, string, int64) *actions.RunnerScaleSetMessage); ok {
|
||||
r0 = rf(ctx, messageQueueUrl, messageQueueAccessToken, lastMessageId)
|
||||
if rf, ok := ret.Get(0).(func(context.Context, string, string, int64, int) *actions.RunnerScaleSetMessage); ok {
|
||||
r0 = rf(ctx, messageQueueUrl, messageQueueAccessToken, lastMessageId, maxCapacity)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*actions.RunnerScaleSetMessage)
|
||||
}
|
||||
}
|
||||
|
||||
if rf, ok := ret.Get(1).(func(context.Context, string, string, int64) error); ok {
|
||||
r1 = rf(ctx, messageQueueUrl, messageQueueAccessToken, lastMessageId)
|
||||
if rf, ok := ret.Get(1).(func(context.Context, string, string, int64, int) error); ok {
|
||||
r1 = rf(ctx, messageQueueUrl, messageQueueAccessToken, lastMessageId, maxCapacity)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user