feat: allow graceful shutdowns (#546)
Add a `Shutdown(context.Context) error` method to the Poller. Calling this method will first shutdown all active polling, preventing any new jobs from spawning. It will then wait for either all jobs to finish, or for the context to be cancelled. If the context is cancelled, it will then force all jobs to end, and then exit. Fixes https://gitea.com/gitea/act_runner/issues/107 Co-authored-by: Rowan Bohde <rowan.bohde@gmail.com> Reviewed-on: https://gitea.com/gitea/act_runner/pulls/546 Reviewed-by: Jason Song <i@wolfogre.com> Reviewed-by: Lunny Xiao <xiaolunwen@gmail.com> Co-authored-by: rowan-allspice <rowan-allspice@noreply.gitea.com> Co-committed-by: rowan-allspice <rowan-allspice@noreply.gitea.com> (cherry picked from commit d1d3cad4b0bfd0bfc5df344306f304043ff63223) Conflicts: internal/pkg/config/config.example.yaml internal/pkg/config/config.go scripts/run.sh trivial context conflicts
This commit is contained in:
parent
1b95689795
commit
30029567f7
6 changed files with 90 additions and 20 deletions
|
@ -1,7 +1,8 @@
|
||||||
# Release Notes
|
# Release Notes
|
||||||
|
|
||||||
## 3.4.2
|
## 3.5.0
|
||||||
|
|
||||||
|
* [Allow graceful shutdowns](https://code.forgejo.org/forgejo/runner/pulls/201): when receiving a signal (INT or TERM) wait for running jobs to complete (up to shutdown_timeout).
|
||||||
* [Fix label declaration](https://code.forgejo.org/forgejo/runner/pulls/176): Runner in daemon mode now takes labels found in config.yml into account when declaration was successful.
|
* [Fix label declaration](https://code.forgejo.org/forgejo/runner/pulls/176): Runner in daemon mode now takes labels found in config.yml into account when declaration was successful.
|
||||||
* [Fix the docker compose example](https://code.forgejo.org/forgejo/runner/pulls/175) to workaround the race on labels.
|
* [Fix the docker compose example](https://code.forgejo.org/forgejo/runner/pulls/175) to workaround the race on labels.
|
||||||
* [Fix the kubernetes dind example](https://code.forgejo.org/forgejo/runner/pulls/169).
|
* [Fix the kubernetes dind example](https://code.forgejo.org/forgejo/runner/pulls/169).
|
||||||
|
|
|
@ -120,8 +120,18 @@ func runDaemon(ctx context.Context, configFile *string) func(cmd *cobra.Command,
|
||||||
|
|
||||||
poller := poll.New(cfg, cli, runner)
|
poller := poll.New(cfg, cli, runner)
|
||||||
|
|
||||||
poller.Poll(ctx)
|
go poller.Poll()
|
||||||
|
|
||||||
|
<-ctx.Done()
|
||||||
|
log.Infof("runner: %s shutdown initiated, waiting %s for running jobs to complete before shutting down", resp.Msg.Runner.Name, cfg.Runner.ShutdownTimeout)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), cfg.Runner.ShutdownTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
err = poller.Shutdown(ctx)
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("runner: %s cancelled in progress jobs during shutdown", resp.Msg.Runner.Name)
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,40 +25,95 @@ type Poller struct {
|
||||||
runner *run.Runner
|
runner *run.Runner
|
||||||
cfg *config.Config
|
cfg *config.Config
|
||||||
tasksVersion atomic.Int64 // tasksVersion used to store the version of the last task fetched from the Gitea.
|
tasksVersion atomic.Int64 // tasksVersion used to store the version of the last task fetched from the Gitea.
|
||||||
|
|
||||||
|
pollingCtx context.Context
|
||||||
|
shutdownPolling context.CancelFunc
|
||||||
|
|
||||||
|
jobsCtx context.Context
|
||||||
|
shutdownJobs context.CancelFunc
|
||||||
|
|
||||||
|
done chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(cfg *config.Config, client client.Client, runner *run.Runner) *Poller {
|
func New(cfg *config.Config, client client.Client, runner *run.Runner) *Poller {
|
||||||
|
pollingCtx, shutdownPolling := context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
jobsCtx, shutdownJobs := context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
done := make(chan struct{})
|
||||||
|
|
||||||
return &Poller{
|
return &Poller{
|
||||||
client: client,
|
client: client,
|
||||||
runner: runner,
|
runner: runner,
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
|
|
||||||
|
pollingCtx: pollingCtx,
|
||||||
|
shutdownPolling: shutdownPolling,
|
||||||
|
|
||||||
|
jobsCtx: jobsCtx,
|
||||||
|
shutdownJobs: shutdownJobs,
|
||||||
|
|
||||||
|
done: done,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Poller) Poll(ctx context.Context) {
|
func (p *Poller) Poll() {
|
||||||
limiter := rate.NewLimiter(rate.Every(p.cfg.Runner.FetchInterval), 1)
|
limiter := rate.NewLimiter(rate.Every(p.cfg.Runner.FetchInterval), 1)
|
||||||
wg := &sync.WaitGroup{}
|
wg := &sync.WaitGroup{}
|
||||||
for i := 0; i < p.cfg.Runner.Capacity; i++ {
|
for i := 0; i < p.cfg.Runner.Capacity; i++ {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go p.poll(ctx, wg, limiter)
|
go p.poll(wg, limiter)
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
|
// signal that we shutdown
|
||||||
|
close(p.done)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Poller) poll(ctx context.Context, wg *sync.WaitGroup, limiter *rate.Limiter) {
|
func (p *Poller) Shutdown(ctx context.Context) error {
|
||||||
|
p.shutdownPolling()
|
||||||
|
|
||||||
|
select {
|
||||||
|
// graceful shutdown completed succesfully
|
||||||
|
case <-p.done:
|
||||||
|
return nil
|
||||||
|
|
||||||
|
// our timeout for shutting down ran out
|
||||||
|
case <-ctx.Done():
|
||||||
|
// when both the timeout fires and the graceful shutdown
|
||||||
|
// completed succsfully, this branch of the select may
|
||||||
|
// fire. Do a non-blocking check here against the graceful
|
||||||
|
// shutdown status to avoid sending an error if we don't need to.
|
||||||
|
_, ok := <-p.done
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// force a shutdown of all running jobs
|
||||||
|
p.shutdownJobs()
|
||||||
|
|
||||||
|
// wait for running jobs to report their status to Gitea
|
||||||
|
_, _ = <-p.done
|
||||||
|
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Poller) poll(wg *sync.WaitGroup, limiter *rate.Limiter) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
for {
|
for {
|
||||||
if err := limiter.Wait(ctx); err != nil {
|
if err := limiter.Wait(p.pollingCtx); err != nil {
|
||||||
if ctx.Err() != nil {
|
if p.pollingCtx.Err() != nil {
|
||||||
log.WithError(err).Debug("limiter wait failed")
|
log.WithError(err).Debug("limiter wait failed")
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
task, ok := p.fetchTask(ctx)
|
task, ok := p.fetchTask(p.pollingCtx)
|
||||||
if !ok {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
p.runTaskWithRecover(ctx, task)
|
|
||||||
|
p.runTaskWithRecover(p.jobsCtx, task)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -23,7 +23,10 @@ runner:
|
||||||
# Please note that the Forgejo instance also has a timeout (3h by default) for the job.
|
# Please note that the Forgejo instance also has a timeout (3h by default) for the job.
|
||||||
# So the job could be stopped by the Forgejo instance if it's timeout is shorter than this.
|
# So the job could be stopped by the Forgejo instance if it's timeout is shorter than this.
|
||||||
timeout: 3h
|
timeout: 3h
|
||||||
# Whether skip verifying the TLS certificate of the Forgejo instance.
|
# The timeout for the runner to wait for running jobs to finish when shutting down.
|
||||||
|
# Any running jobs that haven't finished after this timeout will be cancelled.
|
||||||
|
shutdown_timeout: 0s
|
||||||
|
# Whether skip verifying the TLS certificate of the instance.
|
||||||
insecure: false
|
insecure: false
|
||||||
# The timeout for fetching the job from the Forgejo instance.
|
# The timeout for fetching the job from the Forgejo instance.
|
||||||
fetch_timeout: 5s
|
fetch_timeout: 5s
|
||||||
|
|
|
@ -21,15 +21,16 @@ type Log struct {
|
||||||
|
|
||||||
// Runner represents the configuration for the runner.
|
// Runner represents the configuration for the runner.
|
||||||
type Runner struct {
|
type Runner struct {
|
||||||
File string `yaml:"file"` // File specifies the file path for the runner.
|
File string `yaml:"file"` // File specifies the file path for the runner.
|
||||||
Capacity int `yaml:"capacity"` // Capacity specifies the capacity of the runner.
|
Capacity int `yaml:"capacity"` // Capacity specifies the capacity of the runner.
|
||||||
Envs map[string]string `yaml:"envs"` // Envs stores environment variables for the runner.
|
Envs map[string]string `yaml:"envs"` // Envs stores environment variables for the runner.
|
||||||
EnvFile string `yaml:"env_file"` // EnvFile specifies the path to the file containing environment variables for the runner.
|
EnvFile string `yaml:"env_file"` // EnvFile specifies the path to the file containing environment variables for the runner.
|
||||||
Timeout time.Duration `yaml:"timeout"` // Timeout specifies the duration for runner timeout.
|
Timeout time.Duration `yaml:"timeout"` // Timeout specifies the duration for runner timeout.
|
||||||
Insecure bool `yaml:"insecure"` // Insecure indicates whether the runner operates in an insecure mode.
|
ShutdownTimeout time.Duration `yaml:"shutdown_timeout"` // ShutdownTimeout specifies the duration to wait for running jobs to complete during a shutdown of the runner.
|
||||||
FetchTimeout time.Duration `yaml:"fetch_timeout"` // FetchTimeout specifies the timeout duration for fetching resources.
|
Insecure bool `yaml:"insecure"` // Insecure indicates whether the runner operates in an insecure mode.
|
||||||
FetchInterval time.Duration `yaml:"fetch_interval"` // FetchInterval specifies the interval duration for fetching resources.
|
FetchTimeout time.Duration `yaml:"fetch_timeout"` // FetchTimeout specifies the timeout duration for fetching resources.
|
||||||
Labels []string `yaml:"labels"` // Labels specifies the labels of the runner. Labels are declared on each startup
|
FetchInterval time.Duration `yaml:"fetch_interval"` // FetchInterval specifies the interval duration for fetching resources.
|
||||||
|
Labels []string `yaml:"labels"` // Labels specify the labels of the runner. Labels are declared on each startup
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cache represents the configuration for caching.
|
// Cache represents the configuration for caching.
|
||||||
|
|
|
@ -45,4 +45,4 @@ fi
|
||||||
# Prevent reading the token from the forgejo-runner process
|
# Prevent reading the token from the forgejo-runner process
|
||||||
unset GITEA_RUNNER_REGISTRATION_TOKEN
|
unset GITEA_RUNNER_REGISTRATION_TOKEN
|
||||||
|
|
||||||
forgejo-runner daemon ${CONFIG_ARG}
|
exec forgejo-runner daemon ${CONFIG_ARG}
|
||||||
|
|
Loading…
Reference in a new issue