runner/internal/app/poll/poller.go
caicandong 49a2fcc138 fix endless loop (#306)
fix endless loop in  poll
relate #305

Co-authored-by: CaiCandong <1290147055@qq.com>
Reviewed-on: https://gitea.com/gitea/act_runner/pulls/306
Co-authored-by: caicandong <caicandong@noreply.gitea.com>
Co-committed-by: caicandong <caicandong@noreply.gitea.com>
2023-07-24 07:07:53 +00:00

93 lines
2.1 KiB
Go

// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package poll
import (
"context"
"errors"
"fmt"
"sync"
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
"github.com/bufbuild/connect-go"
log "github.com/sirupsen/logrus"
"golang.org/x/time/rate"
"gitea.com/gitea/act_runner/internal/app/run"
"gitea.com/gitea/act_runner/internal/pkg/client"
"gitea.com/gitea/act_runner/internal/pkg/config"
)
type Poller struct {
client client.Client
runner *run.Runner
cfg *config.Config
}
func New(cfg *config.Config, client client.Client, runner *run.Runner) *Poller {
return &Poller{
client: client,
runner: runner,
cfg: cfg,
}
}
func (p *Poller) Poll(ctx context.Context) {
limiter := rate.NewLimiter(rate.Every(p.cfg.Runner.FetchInterval), 1)
wg := &sync.WaitGroup{}
for i := 0; i < p.cfg.Runner.Capacity; i++ {
wg.Add(1)
go p.poll(ctx, wg, limiter)
}
wg.Wait()
}
func (p *Poller) poll(ctx context.Context, wg *sync.WaitGroup, limiter *rate.Limiter) {
defer wg.Done()
for {
if err := limiter.Wait(ctx); err != nil {
if ctx.Err() != nil {
log.WithError(err).Debug("limiter wait failed")
}
return
}
task, ok := p.fetchTask(ctx)
if !ok {
continue
}
p.runTaskWithRecover(ctx, task)
}
}
func (p *Poller) runTaskWithRecover(ctx context.Context, task *runnerv1.Task) {
defer func() {
if r := recover(); r != nil {
err := fmt.Errorf("panic: %v", r)
log.WithError(err).Error("panic in runTaskWithRecover")
}
}()
if err := p.runner.Run(ctx, task); err != nil {
log.WithError(err).Error("failed to run task")
}
}
func (p *Poller) fetchTask(ctx context.Context) (*runnerv1.Task, bool) {
reqCtx, cancel := context.WithTimeout(ctx, p.cfg.Runner.FetchTimeout)
defer cancel()
resp, err := p.client.FetchTask(reqCtx, connect.NewRequest(&runnerv1.FetchTaskRequest{}))
if errors.Is(err, context.DeadlineExceeded) {
err = nil
}
if err != nil {
log.WithError(err).Error("failed to fetch task")
return nil, false
}
if resp == nil || resp.Msg == nil || resp.Msg.Task == nil {
return nil, false
}
return resp.Msg.Task, true
}