Compare commits
2 commits
main
...
wip-gracef
Author | SHA1 | Date | |
---|---|---|---|
|
31b3de2c96 | ||
|
30029567f7 |
6 changed files with 94 additions and 20 deletions
|
@ -1,7 +1,8 @@
|
||||||
# Release Notes
|
# Release Notes
|
||||||
|
|
||||||
## 3.4.2
|
## 3.5.0
|
||||||
|
|
||||||
|
* [Allow graceful shutdowns](https://code.forgejo.org/forgejo/runner/pulls/201): when receiving a signal (INT or TERM) wait for running jobs to complete (up to shutdown_timeout).
|
||||||
* [Fix label declaration](https://code.forgejo.org/forgejo/runner/pulls/176): Runner in daemon mode now takes labels found in config.yml into account when declaration was successful.
|
* [Fix label declaration](https://code.forgejo.org/forgejo/runner/pulls/176): Runner in daemon mode now takes labels found in config.yml into account when declaration was successful.
|
||||||
* [Fix the docker compose example](https://code.forgejo.org/forgejo/runner/pulls/175) to workaround the race on labels.
|
* [Fix the docker compose example](https://code.forgejo.org/forgejo/runner/pulls/175) to workaround the race on labels.
|
||||||
* [Fix the kubernetes dind example](https://code.forgejo.org/forgejo/runner/pulls/169).
|
* [Fix the kubernetes dind example](https://code.forgejo.org/forgejo/runner/pulls/169).
|
||||||
|
|
|
@ -120,8 +120,18 @@ func runDaemon(ctx context.Context, configFile *string) func(cmd *cobra.Command,
|
||||||
|
|
||||||
poller := poll.New(cfg, cli, runner)
|
poller := poll.New(cfg, cli, runner)
|
||||||
|
|
||||||
poller.Poll(ctx)
|
go poller.Poll()
|
||||||
|
|
||||||
|
<-ctx.Done()
|
||||||
|
log.Infof("runner: %s shutdown initiated, waiting %s for running jobs to complete before shutting down", resp.Msg.Runner.Name, cfg.Runner.ShutdownTimeout)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), cfg.Runner.ShutdownTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
err = poller.Shutdown(ctx)
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("runner: %s cancelled in progress jobs during shutdown", resp.Msg.Runner.Name)
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,40 +25,95 @@ type Poller struct {
|
||||||
runner *run.Runner
|
runner *run.Runner
|
||||||
cfg *config.Config
|
cfg *config.Config
|
||||||
tasksVersion atomic.Int64 // tasksVersion used to store the version of the last task fetched from the Gitea.
|
tasksVersion atomic.Int64 // tasksVersion used to store the version of the last task fetched from the Gitea.
|
||||||
|
|
||||||
|
pollingCtx context.Context
|
||||||
|
shutdownPolling context.CancelFunc
|
||||||
|
|
||||||
|
jobsCtx context.Context
|
||||||
|
shutdownJobs context.CancelFunc
|
||||||
|
|
||||||
|
done chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(cfg *config.Config, client client.Client, runner *run.Runner) *Poller {
|
func New(cfg *config.Config, client client.Client, runner *run.Runner) *Poller {
|
||||||
|
pollingCtx, shutdownPolling := context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
jobsCtx, shutdownJobs := context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
done := make(chan struct{})
|
||||||
|
|
||||||
return &Poller{
|
return &Poller{
|
||||||
client: client,
|
client: client,
|
||||||
runner: runner,
|
runner: runner,
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
|
|
||||||
|
pollingCtx: pollingCtx,
|
||||||
|
shutdownPolling: shutdownPolling,
|
||||||
|
|
||||||
|
jobsCtx: jobsCtx,
|
||||||
|
shutdownJobs: shutdownJobs,
|
||||||
|
|
||||||
|
done: done,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Poller) Poll(ctx context.Context) {
|
func (p *Poller) Poll() {
|
||||||
limiter := rate.NewLimiter(rate.Every(p.cfg.Runner.FetchInterval), 1)
|
limiter := rate.NewLimiter(rate.Every(p.cfg.Runner.FetchInterval), 1)
|
||||||
wg := &sync.WaitGroup{}
|
wg := &sync.WaitGroup{}
|
||||||
for i := 0; i < p.cfg.Runner.Capacity; i++ {
|
for i := 0; i < p.cfg.Runner.Capacity; i++ {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go p.poll(ctx, wg, limiter)
|
go p.poll(wg, limiter)
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
|
// signal that we shutdown
|
||||||
|
close(p.done)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Poller) poll(ctx context.Context, wg *sync.WaitGroup, limiter *rate.Limiter) {
|
func (p *Poller) Shutdown(ctx context.Context) error {
|
||||||
|
p.shutdownPolling()
|
||||||
|
|
||||||
|
select {
|
||||||
|
// graceful shutdown completed succesfully
|
||||||
|
case <-p.done:
|
||||||
|
return nil
|
||||||
|
|
||||||
|
// our timeout for shutting down ran out
|
||||||
|
case <-ctx.Done():
|
||||||
|
// when both the timeout fires and the graceful shutdown
|
||||||
|
// completed succsfully, this branch of the select may
|
||||||
|
// fire. Do a non-blocking check here against the graceful
|
||||||
|
// shutdown status to avoid sending an error if we don't need to.
|
||||||
|
_, ok := <-p.done
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// force a shutdown of all running jobs
|
||||||
|
p.shutdownJobs()
|
||||||
|
|
||||||
|
// wait for running jobs to report their status to Gitea
|
||||||
|
_, _ = <-p.done
|
||||||
|
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Poller) poll(wg *sync.WaitGroup, limiter *rate.Limiter) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
for {
|
for {
|
||||||
if err := limiter.Wait(ctx); err != nil {
|
if err := limiter.Wait(p.pollingCtx); err != nil {
|
||||||
if ctx.Err() != nil {
|
if p.pollingCtx.Err() != nil {
|
||||||
log.WithError(err).Debug("limiter wait failed")
|
log.WithError(err).Debug("limiter wait failed")
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
task, ok := p.fetchTask(ctx)
|
task, ok := p.fetchTask(p.pollingCtx)
|
||||||
if !ok {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
p.runTaskWithRecover(ctx, task)
|
|
||||||
|
p.runTaskWithRecover(p.jobsCtx, task)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -23,7 +23,11 @@ runner:
|
||||||
# Please note that the Forgejo instance also has a timeout (3h by default) for the job.
|
# Please note that the Forgejo instance also has a timeout (3h by default) for the job.
|
||||||
# So the job could be stopped by the Forgejo instance if it's timeout is shorter than this.
|
# So the job could be stopped by the Forgejo instance if it's timeout is shorter than this.
|
||||||
timeout: 3h
|
timeout: 3h
|
||||||
# Whether skip verifying the TLS certificate of the Forgejo instance.
|
# The timeout for the runner to wait for running jobs to finish when shutting down.
|
||||||
|
# Any running jobs that haven't finished after this timeout will be cancelled.
|
||||||
|
# Defaults to the same value as timeout if unset or 0.
|
||||||
|
shutdown_timeout: 3h
|
||||||
|
# Whether skip verifying the TLS certificate of the instance.
|
||||||
insecure: false
|
insecure: false
|
||||||
# The timeout for fetching the job from the Forgejo instance.
|
# The timeout for fetching the job from the Forgejo instance.
|
||||||
fetch_timeout: 5s
|
fetch_timeout: 5s
|
||||||
|
|
|
@ -21,15 +21,16 @@ type Log struct {
|
||||||
|
|
||||||
// Runner represents the configuration for the runner.
|
// Runner represents the configuration for the runner.
|
||||||
type Runner struct {
|
type Runner struct {
|
||||||
File string `yaml:"file"` // File specifies the file path for the runner.
|
File string `yaml:"file"` // File specifies the file path for the runner.
|
||||||
Capacity int `yaml:"capacity"` // Capacity specifies the capacity of the runner.
|
Capacity int `yaml:"capacity"` // Capacity specifies the capacity of the runner.
|
||||||
Envs map[string]string `yaml:"envs"` // Envs stores environment variables for the runner.
|
Envs map[string]string `yaml:"envs"` // Envs stores environment variables for the runner.
|
||||||
EnvFile string `yaml:"env_file"` // EnvFile specifies the path to the file containing environment variables for the runner.
|
EnvFile string `yaml:"env_file"` // EnvFile specifies the path to the file containing environment variables for the runner.
|
||||||
Timeout time.Duration `yaml:"timeout"` // Timeout specifies the duration for runner timeout.
|
Timeout time.Duration `yaml:"timeout"` // Timeout specifies the duration for runner timeout.
|
||||||
Insecure bool `yaml:"insecure"` // Insecure indicates whether the runner operates in an insecure mode.
|
ShutdownTimeout time.Duration `yaml:"shutdown_timeout"` // ShutdownTimeout specifies the duration to wait for running jobs to complete during a shutdown of the runner.
|
||||||
FetchTimeout time.Duration `yaml:"fetch_timeout"` // FetchTimeout specifies the timeout duration for fetching resources.
|
Insecure bool `yaml:"insecure"` // Insecure indicates whether the runner operates in an insecure mode.
|
||||||
FetchInterval time.Duration `yaml:"fetch_interval"` // FetchInterval specifies the interval duration for fetching resources.
|
FetchTimeout time.Duration `yaml:"fetch_timeout"` // FetchTimeout specifies the timeout duration for fetching resources.
|
||||||
Labels []string `yaml:"labels"` // Labels specifies the labels of the runner. Labels are declared on each startup
|
FetchInterval time.Duration `yaml:"fetch_interval"` // FetchInterval specifies the interval duration for fetching resources.
|
||||||
|
Labels []string `yaml:"labels"` // Labels specify the labels of the runner. Labels are declared on each startup
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cache represents the configuration for caching.
|
// Cache represents the configuration for caching.
|
||||||
|
@ -120,6 +121,9 @@ func LoadDefault(file string) (*Config, error) {
|
||||||
if cfg.Runner.Timeout <= 0 {
|
if cfg.Runner.Timeout <= 0 {
|
||||||
cfg.Runner.Timeout = 3 * time.Hour
|
cfg.Runner.Timeout = 3 * time.Hour
|
||||||
}
|
}
|
||||||
|
if cfg.Runner.ShutdownTimeout <= 0 {
|
||||||
|
cfg.Runner.ShutdownTimeout = cfg.Runner.Timeout
|
||||||
|
}
|
||||||
if cfg.Cache.Enabled == nil {
|
if cfg.Cache.Enabled == nil {
|
||||||
b := true
|
b := true
|
||||||
cfg.Cache.Enabled = &b
|
cfg.Cache.Enabled = &b
|
||||||
|
|
|
@ -45,4 +45,4 @@ fi
|
||||||
# Prevent reading the token from the forgejo-runner process
|
# Prevent reading the token from the forgejo-runner process
|
||||||
unset GITEA_RUNNER_REGISTRATION_TOKEN
|
unset GITEA_RUNNER_REGISTRATION_TOKEN
|
||||||
|
|
||||||
forgejo-runner daemon ${CONFIG_ARG}
|
exec forgejo-runner daemon ${CONFIG_ARG}
|
||||||
|
|
Loading…
Reference in a new issue