Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/apis/llm/llm_const.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ const (

/* 启动失败 */
LLM_STATUS_START_FAIL = "start_fail"
/* 探测中 */
LLM_STATUS_PROBING = "probing"
/* 停机失败 */
LLM_STATUS_STOP_FAILED = "stop_fail"

Expand Down
96 changes: 96 additions & 0 deletions pkg/hostman/container/prober/prober.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@ limitations under the License.
package prober

import (
"context"
"fmt"
"io"
"net"
nethttp "net/http"
"strings"
"time"

Expand All @@ -46,20 +49,23 @@ import (
"yunion.io/x/onecloud/pkg/util/exec"
"yunion.io/x/onecloud/pkg/util/probe"
execprobe "yunion.io/x/onecloud/pkg/util/probe/exec"
httpprobe "yunion.io/x/onecloud/pkg/util/probe/http"
tcpprobe "yunion.io/x/onecloud/pkg/util/probe/tcp"
)

const maxProbeRetries = 3

// Prober helps to check the liveness of a container.
type prober struct {
http httpprobe.Prober
exec execprobe.Prober
tcp tcpprobe.Prober
runner container.CommandRunner
}

func newProber(runner container.CommandRunner) *prober {
return &prober{
http: httpprobe.New(),
exec: execprobe.New(),
tcp: tcpprobe.New(),
runner: runner,
Expand Down Expand Up @@ -124,6 +130,78 @@ func (pb *prober) runProbeWithRetries(probeType apis.ContainerProbeType, p *apis
return result, output, err
}

func (pb *prober) runProbeInPodNetNS(pod IPod, run func() (probe.Result, string, error)) (probe.Result, string, error) {
netNSRunner, ok := pb.runner.(container.PodNetNSRunner)
if !ok {
log.Infof("[startup-probe-trace] run probe without pod netns pod=%s", pod.GetId())
return run()
}
var result probe.Result
var output string
log.Infof("[startup-probe-trace] enter pod netns pod=%s", pod.GetId())
err := netNSRunner.RunInPodNetNS(pod.GetId(), func() error {
var runErr error
result, output, runErr = run()
return runErr
})
if err != nil {
log.Errorf("[startup-probe-trace] pod netns probe error pod=%s error=%v", pod.GetId(), err)
return probe.Unknown, "", err
}
log.Infof("[startup-probe-trace] leave pod netns pod=%s result=%s output=%q", pod.GetId(), result, output)
return result, output, nil
}

func (pb *prober) shouldRunProbeInPodNetNS() bool {
_, ok := pb.runner.(container.PodNetNSRunner)
return ok
}

func (pb *prober) newPodNetNSDialContext(pod IPod) httpprobe.DialContextFunc {
netNSRunner, ok := pb.runner.(container.PodNetNSRunner)
if !ok {
return nil
}
return func(ctx context.Context, network string, address string) (net.Conn, error) {
var conn net.Conn
dialer := &net.Dialer{}
log.Infof("[startup-probe-trace] enter pod netns dial pod=%s network=%s address=%s", pod.GetId(), network, address)
err := netNSRunner.RunInPodNetNS(pod.GetId(), func() error {
var dialErr error
conn, dialErr = dialer.DialContext(ctx, network, address)
return dialErr
})
if err != nil {
log.Errorf("[startup-probe-trace] pod netns dial error pod=%s network=%s address=%s error=%v", pod.GetId(), network, address, err)
return nil, err
}
log.Infof("[startup-probe-trace] leave pod netns dial pod=%s network=%s address=%s", pod.GetId(), network, address)
return conn, nil
}
}

func getProbeHost(explicitHost string, pod IPod) (string, error) {
if explicitHost != "" {
return explicitHost, nil
}
for _, nic := range pod.GetDesc().Nics {
if nic.Ip != "" {
return nic.Ip, nil
}
}
return "", errors.Errorf("not found guest ip")
}

func (pb *prober) getProbeHost(explicitHost string, pod IPod) (string, error) {
if explicitHost != "" {
return explicitHost, nil
}
if pb.shouldRunProbeInPodNetNS() {
return "127.0.0.1", nil
}
return getProbeHost(explicitHost, pod)
}

func (pb *prober) runProbe(probeType apis.ContainerProbeType, p *apis.ContainerProbe, pod IPod, container *hostapi.ContainerDesc) (probe.Result, string, error) {
timeout := time.Duration(p.TimeoutSeconds) * time.Second
if p.Exec != nil {
Expand All @@ -147,6 +225,24 @@ func (pb *prober) runProbe(probeType apis.ContainerProbeType, p *apis.ContainerP
// log.Debugf("TCP-Probe Host: %v, Port: %v, Timeout: %v", host, port, timeout)
return pb.tcp.Probe(host, port, timeout)
}
if p.HTTPGet != nil {
host, err := pb.getProbeHost(p.HTTPGet.Host, pod)
if err != nil {
return probe.Unknown, "", err
}
headers := nethttp.Header{}
for _, header := range p.HTTPGet.HTTPHeaders {
headers.Add(header.Name, header.Value)
}
log.Infof("[startup-probe-trace] http probe pod=%s container=%s scheme=%s host=%s port=%d path=%s timeout=%s in_pod_netns=%v", pod.GetId(), container.Id, p.HTTPGet.Scheme, host, p.HTTPGet.Port, p.HTTPGet.Path, timeout, pb.shouldRunProbeInPodNetNS())
httpProber := pb.http
if dialContext := pb.newPodNetNSDialContext(pod); dialContext != nil {
httpProber = httpprobe.NewWithDialContext(dialContext)
}
result, output, err := httpProber.Probe(string(p.HTTPGet.Scheme), host, p.HTTPGet.Port, p.HTTPGet.Path, headers, timeout)
log.Infof("[startup-probe-trace] http probe result pod=%s container=%s result=%s output=%q error=%v", pod.GetId(), container.Id, result, output, err)
return result, output, err
}
errMsg := fmt.Sprintf("Failed to find probe builder for pod %v, container: %v", pod.GetName(), container.Name)
log.Warningf("%s", errMsg)
return probe.Unknown, "", errors.Error(errMsg)
Expand Down
34 changes: 31 additions & 3 deletions pkg/hostman/container/prober/prober_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"yunion.io/x/pkg/util/wait"

"yunion.io/x/onecloud/pkg/apis"
computeapi "yunion.io/x/onecloud/pkg/apis/compute"
"yunion.io/x/onecloud/pkg/apis/host"
"yunion.io/x/onecloud/pkg/hostman/container/prober/results"
"yunion.io/x/onecloud/pkg/hostman/container/status"
Expand Down Expand Up @@ -87,6 +88,8 @@ type Manager interface {
Start()

SetDirtyContainer(ctrId string, reason string)

GetContainerStartupStatus(ctrId string) (string, bool)
}

type manager struct {
Expand Down Expand Up @@ -138,11 +141,33 @@ func (m *manager) cleanDirtyContainer(ctrId string) {
m.dirtyContainers.Delete(ctrId)
}

func (m *manager) GetContainerStartupStatus(ctrId string) (string, bool) {
if _, ok := m.dirtyContainers.Load(ctrId); ok {
return computeapi.CONTAINER_STATUS_PROBING, true
}
result, ok := m.startupManager.Get(ctrId)
if !ok {
return "", false
}
switch result.Result {
case results.Success:
return computeapi.CONTAINER_STATUS_RUNNING, true
case results.Failure:
if result.IsNetFailedError() {
return computeapi.CONTAINER_STATUS_NET_FAILED, true
}
return computeapi.CONTAINER_STATUS_PROBE_FAILED, true
default:
return computeapi.CONTAINER_STATUS_PROBING, true
}
}

// Start syncing probe status. This should only be called once.
func (m *manager) Start() {
// start syncing readiness.
//go wait.Forever(m.updateReadiness, 0)
// start syncing startup.
log.Infof("[startup-probe-trace] manager start")
go wait.Forever(m.updateStartup, 0)
}

Expand All @@ -151,14 +176,17 @@ func (m *manager) AddPod(pod IPod) {
defer m.workerLock.Unlock()

key := probeKey{podUid: pod.GetId()}
for _, c := range pod.GetContainers() {
containers := pod.GetContainers()
log.Infof("[startup-probe-trace] AddPod pod=%s name=%s containers=%d", pod.GetId(), pod.GetName(), len(containers))
for _, c := range containers {
key.containerName = c.Name
if c.Spec.StartupProbe != nil {
key.probeType = apis.ContainerProbeTypeStartup
if _, ok := m.workers[key]; ok {
log.Errorf("Startup probe already exists: %s:%s", pod.GetName(), c.Name)
return
log.Infof("[startup-probe-trace] startup worker exists pod=%s container=%s", pod.GetId(), c.Id)
continue
}
log.Infof("[startup-probe-trace] create startup worker pod=%s container=%s name=%s period=%d timeout=%d failure=%d success=%d", pod.GetId(), c.Id, c.Name, c.Spec.StartupProbe.PeriodSeconds, c.Spec.StartupProbe.TimeoutSeconds, c.Spec.StartupProbe.FailureThreshold, c.Spec.StartupProbe.SuccessThreshold)
w := newWorker(m, key.probeType, pod, c)
m.workers[key] = w
go w.run()
Expand Down
15 changes: 14 additions & 1 deletion pkg/hostman/container/prober/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ type worker struct {
lastResult results.Result
// How many times in a row the probe has returned the same result.
resultRun int
// Total probe attempts made by this worker.
probeAttempts int

// If set, skip probing
onHold bool
Expand Down Expand Up @@ -112,14 +114,17 @@ func newWorker(
// run periodically probes the container.
func (w *worker) run() {
probeTickerPeriod := time.Duration(w.spec.PeriodSeconds) * time.Second
jitter := time.Duration(rand.Float64() * float64(probeTickerPeriod))
log.Infof("[startup-probe-trace] worker start pod=%s container=%s name=%s type=%s period=%s jitter=%s", w.pod.GetId(), w.container.Id, w.container.Name, w.probeType, probeTickerPeriod, jitter)

// If host restarted the probes could be started in rapid succession.
// Let the worker wait for a random portion of tickerPeriod before probing.
time.Sleep(time.Duration(rand.Float64() * float64(probeTickerPeriod)))
time.Sleep(jitter)

probeTicker := time.NewTicker(probeTickerPeriod)

defer func() {
log.Infof("[startup-probe-trace] worker stop pod=%s container=%s name=%s type=%s attempts=%d", w.pod.GetId(), w.container.Id, w.container.Name, w.probeType, w.probeAttempts)
// Clean up.
probeTicker.Stop()
if len(w.containerId) != 0 {
Expand Down Expand Up @@ -159,6 +164,11 @@ func (w *worker) doProbe() (keepGoing bool) {
keepGoing = true
})

w.probeAttempts++
if w.probeAttempts <= 3 {
log.Infof("[startup-probe-trace] probe attempt=%d pod=%s container=%s name=%s type=%s", w.probeAttempts, w.pod.GetId(), w.container.Id, w.container.Name, w.probeType)
}
prevResult := w.lastResult
result, err := w.probeManager.prober.probe(w.probeType, w.pod, w.container)
if err != nil {
log.Errorf("probe: %s, pod: %s, container: %s, error: %v", w.probeType, w.pod.GetId(), w.container.Id, err)
Expand All @@ -172,6 +182,9 @@ func (w *worker) doProbe() (keepGoing bool) {
w.lastResult = result.Result
w.resultRun = 1
}
if w.probeAttempts <= 3 || prevResult != result.Result {
log.Infof("[startup-probe-trace] probe result pod=%s container=%s name=%s type=%s result=%s run=%d reason=%q", w.pod.GetId(), w.container.Id, w.container.Name, w.probeType, result.Result, w.resultRun, result.Reason)
}
_, isContainerDirty := w.probeManager.dirtyContainers.Load(w.container.Id)

if (result.Result == results.Failure && w.resultRun < int(w.spec.FailureThreshold)) ||
Expand Down
5 changes: 5 additions & 0 deletions pkg/hostman/guestman/container/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,8 @@ type CommandRunner interface {
// RunInContainer synchronously executes the command in the container, and returns the output.
RunInContainer(podId string, containerId string, cmd []string, timeout time.Duration) ([]byte, error)
}

type PodNetNSRunner interface {
// RunInPodNetNS synchronously executes run in the pod network namespace.
RunInPodNetNS(podId string, run func() error) error
}
14 changes: 13 additions & 1 deletion pkg/hostman/guestman/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -1362,6 +1362,13 @@ func (s *sPodGuestInstance) StartLocalContainer(ctx context.Context, userCred mc
return ret, nil
}

func (s *sPodGuestInstance) ensureContainerProbeStarted(ctrId string, spec *hostapi.ContainerSpec, reason string) {
if spec != nil && spec.NeedProbe() {
s.getProbeManager().SetDirtyContainer(ctrId, reason)
s.getProbeManager().AddPod(s)
}
}

func (s *sPodGuestInstance) StartContainer(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *hostapi.ContainerCreateInput) (jsonutils.JSONObject, error) {
_, hasCtr := s.containers[ctrId]
needRecreate := false
Expand Down Expand Up @@ -1409,6 +1416,7 @@ func (s *sPodGuestInstance) StartContainer(ctx context.Context, userCred mcclien
if err := s.getCRI().StartContainer(ctx, criId); err != nil {
return nil, errors.Wrap(err, "CRI.StartContainer")
}
s.ensureContainerProbeStarted(ctrId, input.Spec, "container started")

// 如果是 cgroup v2,设备规则已经通过 containerd API 在 CreateContainer 时设置,跳过 eBPF 方式
// 如果是 cgroup v1,继续使用原有的 eBPF 方式
Expand Down Expand Up @@ -2669,7 +2677,11 @@ func (s *sPodGuestInstance) getContainerStatus(ctx context.Context, ctrId string
return "", cs, errors.Wrapf(httperrors.ErrNotFound, "not found container by id %s", ctrId)
}
if ctr.Spec.NeedProbe() {
status = computeapi.CONTAINER_STATUS_PROBING
if probeStatus, ok := s.getProbeManager().GetContainerStartupStatus(ctrId); ok {
status = probeStatus
} else {
status = computeapi.CONTAINER_STATUS_PROBING
}
}
}
if status == computeapi.CONTAINER_STATUS_EXITED && resp.Status.ExitCode != 0 {
Expand Down
Loading
Loading