Compare commits
2 commits
main
...
wip-gracef
Author | SHA1 | Date | |
---|---|---|---|
|
31b3de2c96 | ||
|
30029567f7 |
6 changed files with 94 additions and 20 deletions
|
@ -1,7 +1,8 @@
|
|||
# Release Notes
|
||||
|
||||
## 3.4.2
|
||||
## 3.5.0
|
||||
|
||||
* [Allow graceful shutdowns](https://code.forgejo.org/forgejo/runner/pulls/201): when receiving a signal (INT or TERM) wait for running jobs to complete (up to shutdown_timeout).
|
||||
* [Fix label declaration](https://code.forgejo.org/forgejo/runner/pulls/176): Runner in daemon mode now takes labels found in config.yml into account when declaration was successful.
|
||||
* [Fix the docker compose example](https://code.forgejo.org/forgejo/runner/pulls/175) to workaround the race on labels.
|
||||
* [Fix the kubernetes dind example](https://code.forgejo.org/forgejo/runner/pulls/169).
|
||||
|
|
|
@ -120,8 +120,18 @@ func runDaemon(ctx context.Context, configFile *string) func(cmd *cobra.Command,
|
|||
|
||||
poller := poll.New(cfg, cli, runner)
|
||||
|
||||
poller.Poll(ctx)
|
||||
go poller.Poll()
|
||||
|
||||
<-ctx.Done()
|
||||
log.Infof("runner: %s shutdown initiated, waiting %s for running jobs to complete before shutting down", resp.Msg.Runner.Name, cfg.Runner.ShutdownTimeout)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), cfg.Runner.ShutdownTimeout)
|
||||
defer cancel()
|
||||
|
||||
err = poller.Shutdown(ctx)
|
||||
if err != nil {
|
||||
log.Warnf("runner: %s cancelled in progress jobs during shutdown", resp.Msg.Runner.Name)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,40 +25,95 @@ type Poller struct {
|
|||
runner *run.Runner
|
||||
cfg *config.Config
|
||||
tasksVersion atomic.Int64 // tasksVersion used to store the version of the last task fetched from the Gitea.
|
||||
|
||||
pollingCtx context.Context
|
||||
shutdownPolling context.CancelFunc
|
||||
|
||||
jobsCtx context.Context
|
||||
shutdownJobs context.CancelFunc
|
||||
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
func New(cfg *config.Config, client client.Client, runner *run.Runner) *Poller {
|
||||
pollingCtx, shutdownPolling := context.WithCancel(context.Background())
|
||||
|
||||
jobsCtx, shutdownJobs := context.WithCancel(context.Background())
|
||||
|
||||
done := make(chan struct{})
|
||||
|
||||
return &Poller{
|
||||
client: client,
|
||||
runner: runner,
|
||||
cfg: cfg,
|
||||
|
||||
pollingCtx: pollingCtx,
|
||||
shutdownPolling: shutdownPolling,
|
||||
|
||||
jobsCtx: jobsCtx,
|
||||
shutdownJobs: shutdownJobs,
|
||||
|
||||
done: done,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Poller) Poll(ctx context.Context) {
|
||||
func (p *Poller) Poll() {
|
||||
limiter := rate.NewLimiter(rate.Every(p.cfg.Runner.FetchInterval), 1)
|
||||
wg := &sync.WaitGroup{}
|
||||
for i := 0; i < p.cfg.Runner.Capacity; i++ {
|
||||
wg.Add(1)
|
||||
go p.poll(ctx, wg, limiter)
|
||||
go p.poll(wg, limiter)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// signal that we shutdown
|
||||
close(p.done)
|
||||
}
|
||||
|
||||
func (p *Poller) poll(ctx context.Context, wg *sync.WaitGroup, limiter *rate.Limiter) {
|
||||
func (p *Poller) Shutdown(ctx context.Context) error {
|
||||
p.shutdownPolling()
|
||||
|
||||
select {
|
||||
// graceful shutdown completed succesfully
|
||||
case <-p.done:
|
||||
return nil
|
||||
|
||||
// our timeout for shutting down ran out
|
||||
case <-ctx.Done():
|
||||
// when both the timeout fires and the graceful shutdown
|
||||
// completed succsfully, this branch of the select may
|
||||
// fire. Do a non-blocking check here against the graceful
|
||||
// shutdown status to avoid sending an error if we don't need to.
|
||||
_, ok := <-p.done
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
// force a shutdown of all running jobs
|
||||
p.shutdownJobs()
|
||||
|
||||
// wait for running jobs to report their status to Gitea
|
||||
_, _ = <-p.done
|
||||
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Poller) poll(wg *sync.WaitGroup, limiter *rate.Limiter) {
|
||||
defer wg.Done()
|
||||
for {
|
||||
if err := limiter.Wait(ctx); err != nil {
|
||||
if ctx.Err() != nil {
|
||||
if err := limiter.Wait(p.pollingCtx); err != nil {
|
||||
if p.pollingCtx.Err() != nil {
|
||||
log.WithError(err).Debug("limiter wait failed")
|
||||
}
|
||||
return
|
||||
}
|
||||
task, ok := p.fetchTask(ctx)
|
||||
task, ok := p.fetchTask(p.pollingCtx)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
p.runTaskWithRecover(ctx, task)
|
||||
|
||||
p.runTaskWithRecover(p.jobsCtx, task)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -23,7 +23,11 @@ runner:
|
|||
# Please note that the Forgejo instance also has a timeout (3h by default) for the job.
|
||||
# So the job could be stopped by the Forgejo instance if it's timeout is shorter than this.
|
||||
timeout: 3h
|
||||
# Whether skip verifying the TLS certificate of the Forgejo instance.
|
||||
# The timeout for the runner to wait for running jobs to finish when shutting down.
|
||||
# Any running jobs that haven't finished after this timeout will be cancelled.
|
||||
# Defaults to the same value as timeout if unset or 0.
|
||||
shutdown_timeout: 3h
|
||||
# Whether skip verifying the TLS certificate of the instance.
|
||||
insecure: false
|
||||
# The timeout for fetching the job from the Forgejo instance.
|
||||
fetch_timeout: 5s
|
||||
|
|
|
@ -21,15 +21,16 @@ type Log struct {
|
|||
|
||||
// Runner represents the configuration for the runner.
|
||||
type Runner struct {
|
||||
File string `yaml:"file"` // File specifies the file path for the runner.
|
||||
Capacity int `yaml:"capacity"` // Capacity specifies the capacity of the runner.
|
||||
Envs map[string]string `yaml:"envs"` // Envs stores environment variables for the runner.
|
||||
EnvFile string `yaml:"env_file"` // EnvFile specifies the path to the file containing environment variables for the runner.
|
||||
Timeout time.Duration `yaml:"timeout"` // Timeout specifies the duration for runner timeout.
|
||||
Insecure bool `yaml:"insecure"` // Insecure indicates whether the runner operates in an insecure mode.
|
||||
FetchTimeout time.Duration `yaml:"fetch_timeout"` // FetchTimeout specifies the timeout duration for fetching resources.
|
||||
FetchInterval time.Duration `yaml:"fetch_interval"` // FetchInterval specifies the interval duration for fetching resources.
|
||||
Labels []string `yaml:"labels"` // Labels specifies the labels of the runner. Labels are declared on each startup
|
||||
File string `yaml:"file"` // File specifies the file path for the runner.
|
||||
Capacity int `yaml:"capacity"` // Capacity specifies the capacity of the runner.
|
||||
Envs map[string]string `yaml:"envs"` // Envs stores environment variables for the runner.
|
||||
EnvFile string `yaml:"env_file"` // EnvFile specifies the path to the file containing environment variables for the runner.
|
||||
Timeout time.Duration `yaml:"timeout"` // Timeout specifies the duration for runner timeout.
|
||||
ShutdownTimeout time.Duration `yaml:"shutdown_timeout"` // ShutdownTimeout specifies the duration to wait for running jobs to complete during a shutdown of the runner.
|
||||
Insecure bool `yaml:"insecure"` // Insecure indicates whether the runner operates in an insecure mode.
|
||||
FetchTimeout time.Duration `yaml:"fetch_timeout"` // FetchTimeout specifies the timeout duration for fetching resources.
|
||||
FetchInterval time.Duration `yaml:"fetch_interval"` // FetchInterval specifies the interval duration for fetching resources.
|
||||
Labels []string `yaml:"labels"` // Labels specify the labels of the runner. Labels are declared on each startup
|
||||
}
|
||||
|
||||
// Cache represents the configuration for caching.
|
||||
|
@ -120,6 +121,9 @@ func LoadDefault(file string) (*Config, error) {
|
|||
if cfg.Runner.Timeout <= 0 {
|
||||
cfg.Runner.Timeout = 3 * time.Hour
|
||||
}
|
||||
if cfg.Runner.ShutdownTimeout <= 0 {
|
||||
cfg.Runner.ShutdownTimeout = cfg.Runner.Timeout
|
||||
}
|
||||
if cfg.Cache.Enabled == nil {
|
||||
b := true
|
||||
cfg.Cache.Enabled = &b
|
||||
|
|
|
@ -45,4 +45,4 @@ fi
|
|||
# Prevent reading the token from the forgejo-runner process
|
||||
unset GITEA_RUNNER_REGISTRATION_TOKEN
|
||||
|
||||
forgejo-runner daemon ${CONFIG_ARG}
|
||||
exec forgejo-runner daemon ${CONFIG_ARG}
|
||||
|
|
Loading…
Reference in a new issue