diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 00000000..835d5104 --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,126 @@ +name: Release + +on: + push: + tags: + - 'v*' + +permissions: + contents: write + +jobs: + setup: + name: Setup + runs-on: ubuntu-latest + outputs: + kernel-version: ${{ steps.set-vars.outputs.kernel-version }} + version: ${{ steps.set-vars.outputs.version }} + steps: + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + with: + sparse-checkout: | + .github/.tool-versions + + - name: Set variables + id: set-vars + run: | + kernel_version=$(grep -E '^kernel [0-9.]+$' .github/.tool-versions | sed -E 's/^kernel ([0-9.]+)$/\1/') + echo "kernel-version=${kernel_version}" >> $GITHUB_OUTPUT + + version=${GITHUB_REF#refs/tags/v} + echo "version=${version}" >> $GITHUB_OUTPUT + + build: + name: Build artifacts + needs: setup + runs-on: ubuntu-latest + timeout-minutes: 120 + strategy: + fail-fast: true + matrix: + include: + - arch: x86_64 + docker_arch: amd64 + + steps: + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + + - uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # v4.0.0 + + - name: Build kernel + run: | + docker buildx bake kernel \ + --set kernel.args.KERNEL_VERSION=${{ needs.setup.outputs.kernel-version }} \ + --set kernel.args.KERNEL_ARCH=${{ matrix.arch }} + + - name: Build host and guest binaries + run: | + docker buildx bake host-binaries guest-binaries \ + --set '*.args.KERNEL_ARCH=${{ matrix.arch }}' + + - name: Verify artifacts + run: | + echo "Verifying build artifacts:" + ls -lh _output/ + file _output/nerdbox-kernel-${{ matrix.arch }} + file _output/nerdbox-initrd + file _output/containerd-shim-nerdbox-v1 + file _output/libkrun.so + + - name: Create release archive + run: | + ARCHIVE_DIR="nerdbox-${{ needs.setup.outputs.version }}-linux-${{ matrix.docker_arch }}" + mkdir -p "${ARCHIVE_DIR}" + + cp _output/containerd-shim-nerdbox-v1 "${ARCHIVE_DIR}/" + cp _output/nerdbox-initrd "${ARCHIVE_DIR}/" + cp _output/nerdbox-kernel-${{ matrix.arch }} "${ARCHIVE_DIR}/" + cp _output/libkrun.so "${ARCHIVE_DIR}/libkrun-nerdbox.so" + + tar -czf "${ARCHIVE_DIR}.tar.gz" "${ARCHIVE_DIR}/" + + sha256sum "${ARCHIVE_DIR}.tar.gz" > "${ARCHIVE_DIR}.tar.gz.sha256sum" + + - name: Upload artifacts + uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2 + with: + name: release-artifacts-linux-${{ matrix.docker_arch }} + path: | + nerdbox-*.tar.gz + nerdbox-*.tar.gz.sha256sum + + release: + name: Create Release + needs: [setup, build] + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + + - name: Download artifacts + uses: actions/download-artifact@d3f86a106a0bac45b974a628896c90dbdf5c8093 # v4.3.0 + with: + path: release-artifacts + merge-multiple: true + + - name: List release artifacts + run: ls -lh release-artifacts/ + + - name: Determine pre-release status + id: prerelease + run: | + VERSION="${{ needs.setup.outputs.version }}" + if [[ "${VERSION}" == *"beta"* ]] || [[ "${VERSION}" == *"rc"* ]] || [[ "${VERSION}" == *"alpha"* ]]; then + echo "prerelease=true" >> $GITHUB_OUTPUT + else + echo "prerelease=false" >> $GITHUB_OUTPUT + fi + + - name: Create GitHub Release + uses: softprops/action-gh-release@c95fe1489396fe8a9eb87c0abf8aa5b2ef267fda # v2.2.1 + with: + draft: false + prerelease: ${{ steps.prerelease.outputs.prerelease }} + generate_release_notes: true + make_latest: ${{ steps.prerelease.outputs.prerelease == 'false' }} + files: | + release-artifacts/* diff --git a/Taskfile.yml b/Taskfile.yml index 4727c10d..af154c21 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -67,7 +67,7 @@ tasks: sh: go env GOARCH sources: - test/testbin/main.go - - vendor/github.com/dmcgowan/shimtest/testbin/testbin.go + - vendor/github.com/containerd/shimtest/testbin/testbin.go generates: - test/shim/testdata/testbin - test/stress/testdata/testbin diff --git a/go.mod b/go.mod index d00656de..3918d771 100644 --- a/go.mod +++ b/go.mod @@ -15,9 +15,9 @@ require ( github.com/containerd/log v0.1.1-0.20260403072107-cb1839ebf76b github.com/containerd/otelttrpc v0.1.0 github.com/containerd/plugin v1.1.0 + github.com/containerd/shimtest v0.2.1 github.com/containerd/ttrpc v1.2.9-0.20260501231634-6c2eed2b612e github.com/containerd/typeurl/v2 v2.3.0 - github.com/dmcgowan/shimtest v0.1.8 github.com/docker/go-events v0.0.0-20190806004212-e31b211e4f1c github.com/ebitengine/purego v0.10.1 github.com/insomniacslk/dhcp v0.0.0-20250919081422-f80a1952f48e diff --git a/go.sum b/go.sum index 66fc8bdd..ebfd4525 100644 --- a/go.sum +++ b/go.sum @@ -37,6 +37,8 @@ github.com/containerd/platforms v1.0.0-rc.4 h1:M42JrUT4zfZTqtkUwkr0GzmUWbfyO5VO0 github.com/containerd/platforms v1.0.0-rc.4/go.mod h1:lKlMXyLybmBedS/JJm11uDofzI8L2v0J2ZbYvNsbq1A= github.com/containerd/plugin v1.1.0 h1:O+7lczNJVMy8rz0YNx3xGB8tTf5qY4i5abF041Ew19U= github.com/containerd/plugin v1.1.0/go.mod h1:qBTum+A8lJ6lO44A19Eo7y1OlcLj4OWFH1DA/vnHmcc= +github.com/containerd/shimtest v0.2.1 h1:v6DRcuU5TjwX9Qu+Q8suYvRp13UoJnDk5SPJKVIql3Q= +github.com/containerd/shimtest v0.2.1/go.mod h1:v9b7phlmKrfn9zKHqhDyoe0kv24mxDEYyJlXFcFhjnI= github.com/containerd/ttrpc v1.2.9-0.20260501231634-6c2eed2b612e h1:uMP9FpdM40x+cvSyg6PiiINN9/b2908f8CsF/fZ438g= github.com/containerd/ttrpc v1.2.9-0.20260501231634-6c2eed2b612e/go.mod h1:IvZPGIALrdh9ZNv7AvRrKHCfwVPPCueLO2yuwj3KIqE= github.com/containerd/typeurl/v2 v2.3.0 h1:HZHPhRWo5XMy3QGQoPrUzbW/2ckwjfweHmOwlkIrPAQ= @@ -47,8 +49,6 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dmcgowan/shimtest v0.1.8 h1:+HAqYd9EfClqrOHQ4vR3sTrW2DGpgb/vqdlUS7QRkZc= -github.com/dmcgowan/shimtest v0.1.8/go.mod h1:vV3SFMMBAY8xLOp8bO2xL3hFwlzCZWhVGTgKfV0L1EE= github.com/docker/go-events v0.0.0-20190806004212-e31b211e4f1c h1:+pKlWGMw7gf6bQ+oDZB4KHQFypsfjYlq/C4rfL7D3g8= github.com/docker/go-events v0.0.0-20190806004212-e31b211e4f1c/go.mod h1:Uw6UezgYA44ePAFQYUehOuCzmy5zmg/+nl2ZfMWGkpA= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= diff --git a/internal/shim/task/io.go b/internal/shim/task/io.go index 48bfeb54..49807eb9 100644 --- a/internal/shim/task/io.go +++ b/internal/shim/task/io.go @@ -46,14 +46,14 @@ func generateStreamID(prefix string) string { return fmt.Sprintf("%s-%d-%s", prefix, time.Now().UnixNano(), base64.RawURLEncoding.EncodeToString(b[:])) } -func (s *service) forwardIO(ctx context.Context, ss streamCreator, idPrefix string, sio stdio.Stdio) (stdio.Stdio, func(ctx context.Context) error, error) { +func (s *service) forwardIO(ctx context.Context, ss streamCreator, idPrefix string, sio stdio.Stdio) (stdio.Stdio, func(ctx context.Context) error, <-chan struct{}, func() error, error) { pio := sio if pio.IsNull() { - return pio, nil, nil + return pio, nil, nil, nil, nil } u, err := url.Parse(pio.Stdout) if err != nil { - return stdio.Stdio{}, nil, fmt.Errorf("unable to parse stdout uri: %w", err) + return stdio.Stdio{}, nil, nil, nil, fmt.Errorf("unable to parse stdout uri: %w", err) } if u.Scheme == "" { u.Scheme = defaultScheme @@ -62,37 +62,37 @@ func (s *service) forwardIO(ctx context.Context, ss streamCreator, idPrefix stri switch u.Scheme { case "stream": // Pass through - return pio, nil, nil + return pio, nil, nil, nil, nil case "fifo", "pipe": pio, streams, err = createStreams(ctx, ss, idPrefix, pio) if err != nil { - return stdio.Stdio{}, nil, err + return stdio.Stdio{}, nil, nil, nil, err } //pio.io, err = runc.NewPipeIO(ioUID, ioGID, withConditionalIO(stdio)) case "file": filePath := u.Path if err := os.MkdirAll(filepath.Dir(filePath), 0755); err != nil { - return stdio.Stdio{}, nil, err + return stdio.Stdio{}, nil, nil, nil, err } var f *os.File f, err = os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { - return stdio.Stdio{}, nil, err + return stdio.Stdio{}, nil, nil, nil, err } f.Close() pio.Stdout = filePath pio.Stderr = filePath pio, streams, err = createStreams(ctx, ss, idPrefix, pio) if err != nil { - return stdio.Stdio{}, nil, err + return stdio.Stdio{}, nil, nil, nil, err } default: // TODO: Support "binary" - return stdio.Stdio{}, nil, fmt.Errorf("unsupported STDIO scheme %s: %w", u.Scheme, errdefs.ErrNotImplemented) + return stdio.Stdio{}, nil, nil, nil, fmt.Errorf("unsupported STDIO scheme %s: %w", u.Scheme, errdefs.ErrNotImplemented) } if err != nil { - return stdio.Stdio{}, nil, err + return stdio.Stdio{}, nil, nil, nil, err } defer func() { @@ -105,25 +105,17 @@ func (s *service) forwardIO(ctx context.Context, ss streamCreator, idPrefix stri } }() ioDone := make(chan struct{}) - if err = copyStreams(ctx, streams, sio.Stdin, sio.Stdout, sio.Stderr, ioDone); err != nil { - return stdio.Stdio{}, nil, err + stdinEOF, err := copyStreams(ctx, streams, sio.Stdin, sio.Stdout, sio.Stderr, ioDone) + if err != nil { + return stdio.Stdio{}, nil, nil, nil, err } return pio, func(ctx context.Context) error { - // Wait for the copy goroutines to finish draining before closing - // the stream connections. Closing first causes goroutines that are - // mid-Read to see "use of closed network connection" and return - // early, dropping bytes still buffered in the kernel socket receive - // queue (the close-before-drain race). - // - // In normal operation ioDone always fires on its own: the container - // process exiting closes the runc pipe, vminitd's copy goroutine - // sees EOF and closes its vsock conn, which propagates EOF to the - // host copy goroutines here. ctx.Done() is only reached in - // exceptional cases (VM crash, vminitd hang, kernel wedge). - // - // Ensure the wait is always bounded: if the caller did not provide - // a deadline, apply a default so a wedged guest cannot pin cleanup - // indefinitely. + // ioDone is expected to already be closed by the time ioShutdown + // is called: the host Wait handler blocks until ioDone fires before + // returning to the caller, ensuring all buffered bytes have been + // drained to the FIFO before Delete is issued. This select is a + // safety net for error paths (VM crash, vminitd hang) where Wait + // may not have been called or ioDone may not have fired naturally. if _, ok := ctx.Deadline(); !ok { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, 30*time.Second) @@ -141,7 +133,7 @@ func (s *service) forwardIO(ctx context.Context, ss streamCreator, idPrefix stri } } return err - }, nil + }, ioDone, stdinEOF, nil } func createStreams(ctx context.Context, ss streamCreator, idPrefix string, io stdio.Stdio) (_ stdio.Stdio, conns [3]io.ReadWriteCloser, err error) { diff --git a/internal/shim/task/io_copystreams.go b/internal/shim/task/io_copystreams.go new file mode 100644 index 00000000..5f798017 --- /dev/null +++ b/internal/shim/task/io_copystreams.go @@ -0,0 +1,74 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package task + +import ( + "context" + "io" + + "github.com/containerd/log" +) + +// copyStdinUntilClose reads from f and writes raw bytes to sc until closeCh +// is closed (CloseIO) or f delivers EOF. On either exit it calls +// sc.CloseWrite() to send OP_SHUTDOWN(SEND) in-order on the vsock stdin +// stream, guaranteeing the guest sees EOF after all data already written — +// not via an out-of-band RPC that could race in-flight bytes. +func copyStdinUntilClose(ctx context.Context, sc interface { + io.Writer + CloseWrite() error +}, f io.Reader, buf []byte, closeCh <-chan struct{}) { + type readResult struct { + n int + err error + } + readCh := make(chan readResult, 1) + for { + go func() { + n, err := f.Read(buf) + readCh <- readResult{n, err} + }() + select { + case <-closeCh: + // CloseIO fired: drain the pending read then send in-band EOF. + res := <-readCh + if res.n > 0 { + if _, err := sc.Write(buf[:res.n]); err != nil { + log.G(ctx).WithError(err).Warn("error writing stdin on CloseIO") + } + } + if err := sc.CloseWrite(); err != nil { + log.G(ctx).WithError(err).Warn("error sending stdin EOF via CloseWrite") + } + return + case res := <-readCh: + if res.n > 0 { + if _, err := sc.Write(buf[:res.n]); err != nil { + log.G(ctx).WithError(err).Warn("error writing stdin") + return + } + } + if res.err != nil { + // Pipe/named-pipe EOF: client closed its write end. + if err := sc.CloseWrite(); err != nil { + log.G(ctx).WithError(err).Warn("error sending stdin EOF on pipe close") + } + return + } + } + } +} diff --git a/internal/shim/task/io_copystreams_unix.go b/internal/shim/task/io_copystreams_unix.go index ca0597d6..889ffa76 100644 --- a/internal/shim/task/io_copystreams_unix.go +++ b/internal/shim/task/io_copystreams_unix.go @@ -31,7 +31,20 @@ import ( "github.com/containerd/log" ) -func copyStreams(ctx context.Context, streams [3]io.ReadWriteCloser, stdin, stdout, stderr string, done chan struct{}) error { +// stdinStreamWriteCloser is the interface the host-side stdin vsock stream +// connection must implement. CloseWrite sends OP_SHUTDOWN(SEND) in-order +// after all data, delivering EOF to the guest without a destructive transport +// close. Asserting at setup time ensures a future wrapper that drops +// CloseWrite fails loudly rather than silently hanging the guest's read. +type stdinStreamWriteCloser interface { + io.ReadWriteCloser + CloseWrite() error +} + +// copyStreams returns a stdinEOF function that, when called (by CloseIO), +// signals the stdin goroutine to stop reading the FIFO and send the +// OP_SHUTDOWN(SEND) in-band EOF to the guest. It is nil when stdin is empty. +func copyStreams(ctx context.Context, streams [3]io.ReadWriteCloser, stdin, stdout, stderr string, done chan struct{}) (stdinEOF func() error, err error) { var cwg sync.WaitGroup var copying atomic.Int32 copying.Store(2) @@ -90,7 +103,7 @@ func copyStreams(ctx context.Context, streams [3]io.ReadWriteCloser, stdin, stdo } ok, err := fifo.IsFifo(i.name) if err != nil { - return err + return nil, err } var ( fw io.WriteCloser @@ -98,10 +111,10 @@ func copyStreams(ctx context.Context, streams [3]io.ReadWriteCloser, stdin, stdo ) if ok { if fw, err = fifo.OpenFifo(ctx, i.name, syscall.O_WRONLY, 0); err != nil { - return fmt.Errorf("containerd-shim: opening w/o fifo %q failed: %w", i.name, err) + return nil, fmt.Errorf("containerd-shim: opening w/o fifo %q failed: %w", i.name, err) } if fr, err = fifo.OpenFifo(ctx, i.name, syscall.O_RDONLY, 0); err != nil { - return fmt.Errorf("containerd-shim: opening r/o fifo %q failed: %w", i.name, err) + return nil, fmt.Errorf("containerd-shim: opening r/o fifo %q failed: %w", i.name, err) } } else { if sameFile != nil { @@ -110,7 +123,7 @@ func copyStreams(ctx context.Context, streams [3]io.ReadWriteCloser, stdin, stdo continue } if fw, err = os.OpenFile(i.name, syscall.O_WRONLY|syscall.O_APPEND, 0); err != nil { - return fmt.Errorf("containerd-shim: opening file %q failed: %w", i.name, err) + return nil, fmt.Errorf("containerd-shim: opening file %q failed: %w", i.name, err) } if stdout == stderr { sameFile = newCountingWriteCloser(fw, 1) @@ -119,21 +132,37 @@ func copyStreams(ctx context.Context, streams [3]io.ReadWriteCloser, stdin, stdo i.dest(fw, fr) } if stdin != "" { + // Assert early: the stdin vsock stream must implement CloseWrite so + // we can send OP_SHUTDOWN(SEND) in-order when CloseIO fires, rather + // than forwarding an out-of-band RPC that races in-flight bytes. + sc, ok := streams[0].(stdinStreamWriteCloser) + if !ok { + return nil, fmt.Errorf("stdin stream connection does not implement CloseWrite; vsock conn required") + } f, err := fifo.OpenFifo(context.Background(), stdin, syscall.O_RDONLY|syscall.O_NONBLOCK, 0) if err != nil { - return fmt.Errorf("containerd-shim: opening %s failed: %s", stdin, err) + return nil, fmt.Errorf("containerd-shim: opening %s failed: %s", stdin, err) } + // closeCh is closed by the stdinEOF function (triggered by CloseIO). + closeCh := make(chan struct{}) cwg.Add(1) go func() { cwg.Done() p := bufPool.Get().(*[]byte) defer bufPool.Put(p) - - io.CopyBuffer(streams[0], f, *p) - streams[0].Close() + copyStdinUntilClose(ctx, sc, f, *p, closeCh) + // Do NOT Close sc here; deferred to ioShutdown/forwardIO cleanup + // so the transport outlives the in-band EOF and the host can + // close its end cleanly after the guest drains. f.Close() }() + stdinEOF = func() error { + // Signal the goroutine to stop reading the FIFO and send + // OP_SHUTDOWN(SEND) in-order on the stdin stream. + close(closeCh) + return nil + } } cwg.Wait() - return nil + return stdinEOF, nil } diff --git a/internal/shim/task/io_copystreams_windows.go b/internal/shim/task/io_copystreams_windows.go index e33376cc..81d036f8 100644 --- a/internal/shim/task/io_copystreams_windows.go +++ b/internal/shim/task/io_copystreams_windows.go @@ -31,7 +31,14 @@ import ( "github.com/containerd/log" ) -func copyStreams(ctx context.Context, streams [3]io.ReadWriteCloser, stdin, stdout, stderr string, done chan struct{}) error { +// stdinStreamWriteCloser mirrors the Unix definition; both builds need it for +// the CloseIO-driven in-band stdin EOF. +type stdinStreamWriteCloser interface { + io.ReadWriteCloser + CloseWrite() error +} + +func copyStreams(ctx context.Context, streams [3]io.ReadWriteCloser, stdin, stdout, stderr string, done chan struct{}) (stdinEOF func() error, err error) { var cwg sync.WaitGroup var copying atomic.Int32 copying.Store(2) @@ -90,7 +97,7 @@ func copyStreams(ctx context.Context, streams [3]io.ReadWriteCloser, stdin, stdo } var ( - fw io.WriteCloser + fw io.WriteCloser err error ) @@ -99,7 +106,7 @@ func copyStreams(ctx context.Context, streams [3]io.ReadWriteCloser, stdin, stdo if isNamedPipe(i.name) { fw, err = winio.DialPipe(i.name, &pipeDialTimeout) if err != nil { - return fmt.Errorf("containerd-shim: connecting to named pipe %q failed: %w", i.name, err) + return nil, fmt.Errorf("containerd-shim: connecting to named pipe %q failed: %w", i.name, err) } } else { if sameFile != nil { @@ -108,7 +115,7 @@ func copyStreams(ctx context.Context, streams [3]io.ReadWriteCloser, stdin, stdo continue } if fw, err = os.OpenFile(i.name, os.O_WRONLY|os.O_APPEND, 0); err != nil { - return fmt.Errorf("containerd-shim: opening file %q failed: %w", i.name, err) + return nil, fmt.Errorf("containerd-shim: opening file %q failed: %w", i.name, err) } if stdout == stderr { sameFile = newCountingWriteCloser(fw, 1) @@ -117,33 +124,40 @@ func copyStreams(ctx context.Context, streams [3]io.ReadWriteCloser, stdin, stdo i.dest(fw, nil) } if stdin != "" { + sc, ok := streams[0].(stdinStreamWriteCloser) + if !ok { + return nil, fmt.Errorf("stdin stream connection does not implement CloseWrite; vsock conn required") + } var f io.ReadCloser if isNamedPipe(stdin) { conn, err := winio.DialPipe(stdin, &pipeDialTimeout) if err != nil { - return fmt.Errorf("containerd-shim: connecting to named pipe %q for stdin failed: %w", stdin, err) + return nil, fmt.Errorf("containerd-shim: connecting to named pipe %q for stdin failed: %w", stdin, err) } f = conn } else { var err error f, err = os.Open(stdin) if err != nil { - return fmt.Errorf("containerd-shim: opening %s failed: %s", stdin, err) + return nil, fmt.Errorf("containerd-shim: opening %s failed: %s", stdin, err) } } + closeCh := make(chan struct{}) cwg.Add(1) go func() { cwg.Done() p := bufPool.Get().(*[]byte) defer bufPool.Put(p) - - io.CopyBuffer(streams[0], f, *p) - streams[0].Close() + copyStdinUntilClose(ctx, sc, f, *p, closeCh) f.Close() }() + stdinEOF = func() error { + close(closeCh) + return nil + } } cwg.Wait() - return nil + return stdinEOF, nil } // isNamedPipe checks if a path looks like a Windows named pipe (\\.\pipe\...). diff --git a/internal/shim/task/service.go b/internal/shim/task/service.go index b56ee17f..32fdb033 100644 --- a/internal/shim/task/service.go +++ b/internal/shim/task/service.go @@ -84,11 +84,26 @@ func NewTaskService(ctx context.Context, sb sandbox.Sandbox, publisher shim.Publ type container struct { ioShutdown func(context.Context) error + // ioDone is closed when the host-side copy goroutines for the init + // process have fully drained output to the destination FIFO. + ioDone <-chan struct{} + // stdinEOF, when non-nil, signals the host stdin goroutine to stop + // reading the FIFO and send OP_SHUTDOWN(SEND) in-order on the stdin + // stream. Called by CloseIO instead of forwarding the RPC out-of-band, + // guaranteeing the EOF arrives after all in-flight stdin bytes. + stdinEOF func() error // forwarder is the UNIX socket forwarder for this specific container. forwarder *socketForwarder execShutdowns map[string]func(context.Context) error + // execIODone holds the ioDone channel for each exec's host-side copy + // goroutines. The host Wait handler blocks on this before returning so + // that all output bytes are guaranteed to be in the destination FIFO + // before the caller can issue Delete. + execIODone map[string]<-chan struct{} + // execStdinEOF holds the in-band stdin EOF sender per exec ID. + execStdinEOF map[string]func() error } // shutdown shuts down the container's IO streams, socket forwarding, and all @@ -384,7 +399,7 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ * Terminal: r.Terminal, } - cio, ioShutdown, err := s.forwardIO(ctx, s.sb, r.ID, rio) + cio, ioShutdown, initIODone, initStdinEOF, err := s.forwardIO(ctx, s.sb, r.ID, rio) if err != nil { return nil, errgrpc.ToGRPC(err) } @@ -406,7 +421,11 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ * preCreate := time.Now() c := &container{ ioShutdown: ioShutdown, + ioDone: initIODone, + stdinEOF: initStdinEOF, execShutdowns: make(map[string]func(context.Context) error), + execIODone: make(map[string]<-chan struct{}), + execStdinEOF: make(map[string]func() error), } tc := taskAPI.NewTTRPCTaskClient(vmc) @@ -505,6 +524,8 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP log.G(ctx).WithError(err).WithField("exec", r.ExecID).Error("failed to shutdown exec io after delete") } delete(c.execShutdowns, r.ExecID) + delete(c.execIODone, r.ExecID) + delete(c.execStdinEOF, r.ExecID) } } else { if err := c.shutdown(ctx); err != nil { @@ -534,7 +555,7 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty Terminal: r.Terminal, } - cio, ioShutdown, err := s.forwardIO(ctx, s.sb, r.ID+"-"+r.ExecID, rio) + cio, ioShutdown, ioDone, stdinEOF, err := s.forwardIO(ctx, s.sb, r.ID+"-"+r.ExecID, rio) if err != nil { return nil, errgrpc.ToGRPC(err) } @@ -543,6 +564,12 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty c, ok := s.containers[r.ID] if ok && ioShutdown != nil { c.execShutdowns[r.ExecID] = ioShutdown + if ioDone != nil { + c.execIODone[r.ExecID] = ioDone + } + if stdinEOF != nil { + c.execStdinEOF[r.ExecID] = stdinEOF + } } s.mu.Unlock() @@ -665,6 +692,35 @@ func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.Pi // CloseIO of a process func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) { log.G(ctx).WithFields(log.Fields{"id": r.ID, "exec": r.ExecID, "stdin": r.Stdin}).Info("close io") + if r.Stdin { + // Deliver stdin EOF in-band on the stream connection rather than + // forwarding the RPC out-of-band. The in-band CloseWrite sends + // OP_SHUTDOWN(SEND) ordered after all data already written to the + // stream, preventing truncation caused by an out-of-band RPC on a + // separate vsock connection racing in-flight stdin bytes. + s.mu.Lock() + var stdinEOF func() error + if c, ok := s.containers[r.ID]; ok { + if r.ExecID != "" { + stdinEOF = c.execStdinEOF[r.ExecID] + } else { + stdinEOF = c.stdinEOF + } + } + s.mu.Unlock() + + if stdinEOF != nil { + if err := stdinEOF(); err != nil { + log.G(ctx).WithError(err).WithFields(log.Fields{ + "id": r.ID, + "exec": r.ExecID, + }).Error("failed to send stdin EOF") + return nil, errgrpc.ToGRPC(err) + } + return empty, nil + } + } + // Non-stdin CloseIO or no stream registered: forward to the guest. vmc, err := s.sb.Client() if err != nil { return nil, errgrpc.ToGRPC(err) @@ -707,7 +763,46 @@ func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.Wa return nil, errgrpc.ToGRPC(err) } tc := taskAPI.NewTTRPCTaskClient(vmc) - return tc.Wait(ctx, r) + resp, err := tc.Wait(ctx, r) + if err != nil { + return nil, err + } + // Block until the host-side copy goroutines have fully drained output + // to the destination FIFO before returning the exit status to the + // caller. This guarantees that by the time the caller sees the Wait + // response, all process output is in the FIFO — so a subsequent + // Delete cannot race with in-flight bytes still buffered in the + // VM→host vsock transport. + // + // This mirrors the guarantee runc shims provide natively: with runc + // the process output pipe is local, so EOF on the FIFO is synchronous + // with process exit. With nerdbox the output travels through a + // vsock→Unix socket bridge, introducing a transport lag that the + // Wait response must account for. + s.mu.Lock() + var ioDone <-chan struct{} + if c, ok := s.containers[r.ID]; ok { + if r.ExecID != "" { + ioDone = c.execIODone[r.ExecID] + } else { + ioDone = c.ioDone + } + } + s.mu.Unlock() + + if ioDone != nil { + drainCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second) + defer cancel() + select { + case <-ioDone: + case <-drainCtx.Done(): + log.G(ctx).WithError(drainCtx.Err()).WithFields(log.Fields{ + "id": r.ID, + "exec": r.ExecID, + }).Warn("timed out waiting for IO drain after wait") + } + } + return resp, nil } // Connect returns shim information such as the shim's pid diff --git a/internal/vminit/process/io.go b/internal/vminit/process/io.go index da9e5d06..342f421d 100644 --- a/internal/vminit/process/io.go +++ b/internal/vminit/process/io.go @@ -54,6 +54,20 @@ var bufPool = sync.Pool{ }, } +// StreamWriteCloser is the interface that vsock stream connections must +// implement. CloseWrite sends OP_SHUTDOWN(SEND) in-order after all data, +// signalling the peer to drain and deliver EOF without a destructive +// transport close. The transport is closed later (at delete/shutdown) via +// Close, by which point the peer has already fully drained. +// +// Asserting this interface at setup time (rather than per-call) ensures a +// future wrapper type that inadvertently drops CloseWrite causes a loud +// setup failure instead of silently hanging on the peer's read-to-EOF. +type StreamWriteCloser interface { + io.ReadWriteCloser + CloseWrite() error +} + type processIO struct { io runc.IO @@ -65,15 +79,27 @@ type processIO struct { } func (p *processIO) Close() error { + var errs []error if p.io != nil { - return p.io.Close() + if err := p.io.Close(); err != nil { + errs = append(errs, err) + } } + // Always close the vsock stream connections regardless of whether a + // runc PipeIO is also present. In the stream scheme both co-exist + // (processIO.io = PipeIO, processIO.streams = vsock conns), and the + // goroutines only call CloseWrite (not Close) on the stream conns, + // deferring transport teardown here so it happens after the peer has + // drained. The dedup guard (i != 2 || s != p.streams[1]) ensures the + // shared stdout==stderr conn is closed exactly once. for i, s := range p.streams { if s != nil && (i != 2 || s != p.streams[1]) { - s.Close() + if err := s.Close(); err != nil { + errs = append(errs, err) + } } } - return nil + return errors.Join(errs...) } func (p *processIO) IO() runc.IO { @@ -167,49 +193,11 @@ func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, s for _, i := range []struct { name string index int - dest func(wc io.WriteCloser, rc io.Closer) + src func() io.Reader + label string }{ - { - name: stdout, - index: 1, - dest: func(wc io.WriteCloser, rc io.Closer) { - wg.Add(1) - cwg.Add(1) - go func() { - cwg.Done() - p := bufPool.Get().(*[]byte) - defer bufPool.Put(p) - if _, err := io.CopyBuffer(wc, rio.Stdout(), *p); err != nil { - log.G(ctx).WithError(err).Warn("error copying stdout") - } - wg.Done() - wc.Close() - if rc != nil { - rc.Close() - } - }() - }, - }, { - name: stderr, - index: 2, - dest: func(wc io.WriteCloser, rc io.Closer) { - wg.Add(1) - cwg.Add(1) - go func() { - cwg.Done() - p := bufPool.Get().(*[]byte) - defer bufPool.Put(p) - if _, err := io.CopyBuffer(wc, rio.Stderr(), *p); err != nil { - log.G(ctx).WithError(err).Warn("error copying stderr") - } - wg.Done() - wc.Close() - if rc != nil { - rc.Close() - } - }() - }, - }, + {name: stdout, index: 1, src: func() io.Reader { return rio.Stdout() }, label: "stdout"}, + {name: stderr, index: 2, src: func() io.Reader { return rio.Stderr() }, label: "stderr"}, } { if i.name == "" { continue @@ -219,11 +207,41 @@ func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, s fr io.Closer ) if streams[i.index] != nil { - if streams[i.index] == nil { - continue + // Assert the vsock stream connection implements CloseWrite early, + // at setup time. CloseWrite sends OP_SHUTDOWN(SEND) in-order after + // all data so the host drains gracefully; the transport Close is + // deferred to processIO.Close() at delete time. Failing here loudly + // prevents a silent hang if a future wrapper drops CloseWrite. + sc, ok := streams[i.index].(StreamWriteCloser) + if !ok { + return nil, fmt.Errorf("stream connection for %s does not implement CloseWrite; vsock conn required", i.label) } - fw = streams[i.index] + src := i.src + label := i.label + wg.Add(1) + cwg.Add(1) + go func() { + cwg.Done() + p := bufPool.Get().(*[]byte) + defer bufPool.Put(p) + if _, err := io.CopyBuffer(sc, src(), *p); err != nil { + log.G(ctx).WithError(err).Warnf("error copying %s", label) + } + // CloseWrite sends OP_SHUTDOWN(SEND) ordered after all data, + // signalling the host to drain and deliver EOF to the FIFO. + // Do NOT Close the transport here; that is deferred to + // processIO.Close() at delete time, which runs after the host + // has confirmed receipt (the Wait-drain gate ensures this). + if err := sc.CloseWrite(); err != nil { + log.G(ctx).WithError(err).Warnf("error closing %s stream", label) + } + wg.Done() + }() + continue } else { + // Non-stream path: fifo or plain file. Uses a simple raw copy + // with transport close in the goroutine (no CloseWrite needed + // for local fifos/files). ok, err := fifo.IsFifo(i.name) if err != nil { return nil, err @@ -238,18 +256,35 @@ func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, s } else { if sameFile != nil { sameFile.bumpCount(1) - i.dest(sameFile, nil) - continue + fw = sameFile + } else { + if fw, err = os.OpenFile(i.name, syscall.O_WRONLY|syscall.O_APPEND, 0); err != nil { + return nil, fmt.Errorf("containerd-shim: opening file %q failed: %w", i.name, err) + } + if stdout == stderr { + sameFile = newCountingWriteCloser(fw, 1) + fw = sameFile + } } - if fw, err = os.OpenFile(i.name, syscall.O_WRONLY|syscall.O_APPEND, 0); err != nil { - return nil, fmt.Errorf("containerd-shim: opening file %q failed: %w", i.name, err) + } + src := i.src + label := i.label + wg.Add(1) + cwg.Add(1) + go func() { + cwg.Done() + p := bufPool.Get().(*[]byte) + defer bufPool.Put(p) + if _, err := io.CopyBuffer(fw, src(), *p); err != nil { + log.G(ctx).WithError(err).Warnf("error copying %s", label) } - if stdout == stderr { - sameFile = newCountingWriteCloser(fw, 1) + wg.Done() + fw.Close() + if fr != nil { + fr.Close() } - } + }() } - i.dest(fw, fr) } if stdin == "" { return nil, nil diff --git a/internal/vminit/runc/container.go b/internal/vminit/runc/container.go index 0df5aebd..75a4a839 100644 --- a/internal/vminit/runc/container.go +++ b/internal/vminit/runc/container.go @@ -392,15 +392,11 @@ func (c *Container) Kill(ctx context.Context, r *task.KillRequest) error { // CloseIO of a process func (c *Container) CloseIO(ctx context.Context, r *task.CloseIORequest) error { - p, err := c.Process(r.ExecID) - if err != nil { - return err - } - if stdin := p.Stdin(); stdin != nil { - if err := stdin.Close(); err != nil { - return fmt.Errorf("close stdin: %w", err) - } - } + // Stdin EOF is delivered in-band by the host via OP_SHUTDOWN(SEND) on + // the vsock stdin stream (triggered by the host's CloseIO handler). + // The host no longer forwards this RPC over the out-of-band ttrpc + // connection to avoid a race between the RPC and in-flight stdin bytes + // on the stream connection. This handler is therefore a no-op. return nil } diff --git a/internal/vminit/runc/platform.go b/internal/vminit/runc/platform.go index 87f73805..67741d0e 100644 --- a/internal/vminit/runc/platform.go +++ b/internal/vminit/runc/platform.go @@ -108,19 +108,29 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console switch uri.Scheme { case "stream": - out, err := p.streams.Get(strings.TrimPrefix(stdout, "stream://")) + raw, err := p.streams.Get(strings.TrimPrefix(stdout, "stream://")) if err != nil { return nil, err } + // Assert CloseWrite at setup time so a future wrapper that drops it + // causes an immediate failure rather than a silent hang. + sc, ok := raw.(process.StreamWriteCloser) + if !ok { + return nil, fmt.Errorf("console stdout stream connection does not implement CloseWrite; vsock conn required") + } wg.Add(1) cwg.Add(1) go func() { cwg.Done() buf := bufPool.Get().(*[]byte) defer bufPool.Put(buf) - io.CopyBuffer(out, epollConsole, *buf) - - out.Close() + io.CopyBuffer(sc, epollConsole, *buf) + // CloseWrite sends OP_SHUTDOWN(SEND) in-order after all data. + // Do NOT Close the transport here; deferred to processIO.Close. + if err := sc.CloseWrite(); err != nil { + // Non-fatal: log only; transport will be closed at delete. + _ = err + } wg.Done() }() cwg.Wait() diff --git a/plugins/vminit/streaming/plugin_test.go b/plugins/vminit/streaming/plugin_test.go index 1ab72cf8..4b0e9333 100644 --- a/plugins/vminit/streaming/plugin_test.go +++ b/plugins/vminit/streaming/plugin_test.go @@ -372,12 +372,12 @@ func TestBidirectionalStreaming(t *testing.T) { // or allocates unbounded memory. func FuzzRecv(f *testing.F) { // Seed with interesting cases - f.Add([]byte{}) // empty - f.Add([]byte{0, 0, 0, 0}) // zero-length frame (EOF) - f.Add([]byte{0, 0, 0, 5, 1, 2, 3, 4, 5}) // valid 5-byte frame - f.Add([]byte{0, 0, 0, 1, 0xff}) // 1-byte frame + f.Add([]byte{}) // empty + f.Add([]byte{0, 0, 0, 0}) // zero-length frame (EOF) + f.Add([]byte{0, 0, 0, 5, 1, 2, 3, 4, 5}) // valid 5-byte frame + f.Add([]byte{0, 0, 0, 1, 0xff}) // 1-byte frame f.Add([]byte{0xff, 0xff, 0xff, 0xff}) // max uint32 length - f.Add([]byte{0, 0, 0, 3, 1, 2}) // truncated data + f.Add([]byte{0, 0, 0, 3, 1, 2}) // truncated data f.Add([]byte{0, 0, 0, 1}) // length but no data f.Add([]byte{0, 0, 0, 5, 10, 5, 116, 101, 115, 116}) // valid proto Any diff --git a/test/shim/shim_test.go b/test/shim/shim_test.go index d6b21340..d9e8ab26 100644 --- a/test/shim/shim_test.go +++ b/test/shim/shim_test.go @@ -38,7 +38,7 @@ import ( "github.com/containerd/typeurl/v2" "github.com/opencontainers/runtime-spec/specs-go" - "github.com/dmcgowan/shimtest" + "github.com/containerd/shimtest" ) const shimBinaryName = "containerd-shim-nerdbox-v1" diff --git a/test/stress/stress_test.go b/test/stress/stress_test.go index af00c0de..fc4a85c1 100644 --- a/test/stress/stress_test.go +++ b/test/stress/stress_test.go @@ -39,7 +39,7 @@ import ( "github.com/containerd/typeurl/v2" "github.com/opencontainers/runtime-spec/specs-go" - "github.com/dmcgowan/shimtest" + "github.com/containerd/shimtest" ) const shimBinaryName = "containerd-shim-nerdbox-v1" diff --git a/test/testbin/main.go b/test/testbin/main.go index a67e026a..22273bf9 100644 --- a/test/testbin/main.go +++ b/test/testbin/main.go @@ -20,10 +20,10 @@ // shim and stress test suites can embed it in their container rootfs images. // // This main is intentionally a one-liner: all logic lives in the importable -// github.com/dmcgowan/shimtest/testbin package so it is covered by +// github.com/containerd/shimtest/testbin package so it is covered by // go mod vendor and stays in sync with the vendored shimtest version. package main -import "github.com/dmcgowan/shimtest/testbin" +import "github.com/containerd/shimtest/testbin" func main() { testbin.Main() } diff --git a/vendor/github.com/dmcgowan/shimtest/.gitignore b/vendor/github.com/containerd/shimtest/.gitignore similarity index 100% rename from vendor/github.com/dmcgowan/shimtest/.gitignore rename to vendor/github.com/containerd/shimtest/.gitignore diff --git a/vendor/github.com/containerd/shimtest/.golangci.yml b/vendor/github.com/containerd/shimtest/.golangci.yml new file mode 100644 index 00000000..afd57b55 --- /dev/null +++ b/vendor/github.com/containerd/shimtest/.golangci.yml @@ -0,0 +1,54 @@ +version: "2" +linters: + enable: + - copyloopvar # Checks for loop variable copies in Go 1.22+ + - dupword # Checks for duplicate words in the source code + - misspell + - nolintlint + - revive + - unconvert + disable: + - errcheck + - unused + settings: + revive: + rules: + - name: package-comments + severity: warning + disabled: true + exclude: [""] + nolintlint: + allow-unused: true + exclusions: + generated: lax + rules: + - linters: + - revive + text: if-return + - linters: + - revive + text: empty-block + - linters: + - revive + text: superfluous-else + - linters: + - revive + text: unused-parameter + - linters: + - revive + text: unreachable-code + - linters: + - revive + text: redefines-builtin-id + - linters: + - staticcheck + text: "SA4010" # append result unused in intentional infinite memory-hog loop +issues: + max-issues-per-linter: 0 + max-same-issues: 0 +formatters: + enable: + - gofmt + - goimports + exclusions: + generated: strict diff --git a/vendor/github.com/containerd/shimtest/AGENTS.md b/vendor/github.com/containerd/shimtest/AGENTS.md new file mode 100644 index 00000000..4d55ad91 --- /dev/null +++ b/vendor/github.com/containerd/shimtest/AGENTS.md @@ -0,0 +1,64 @@ +# shimtest agent rules + +shimtest is a conformance test suite for the containerd shim API. The tests +define how the shim API **must** be used and verify that shim implementations +comply with the API contract. + +## Required validation after every change + +After **any** change to Go source, `go.mod`, or `go.sum`, run all of the +following and fix every failure before considering the work done: + +```sh +# 1. Build +go build ./... + +# 2. Linters +golangci-lint run ./... + +# 3. Verify formatting +gofmt -l . # must print nothing +``` + +## Commit messages + +- Subject line must be ≤ 72 characters. +- Every commit must include a `Signed-off-by` trailer (DCO). Use + `git commit -s` to add it automatically. +- Commits must not have a `From:` line that differs from the `Author:` field. + +## Adding new tests + +When adding a new test or benchmark: + +1. **Update the README table.** Add a row to the appropriate table in + `README.md` (Tests, Benchmarks, or Planned tests) with the test name, + feature flag (if any), and a concise description of what the test + verifies at the API level. + +2. **Focus on the shim API contract.** Tests and their comments must describe + what the shim API requires, not how a particular implementation behaves. + - Write comments in terms of the API specification: what the caller must + send, what the shim must return, and what invariants must hold. + - Do not reference specific shim implementations (e.g., runc, nerdbox) in + test logic, test names, or comments. + - Do not add tests that are designed to catch a known bug in a specific + implementation. If a bug motivates a test, the test should be framed + as an API conformance requirement, not a regression for that + implementation. + - Avoid comments like "this catches the nerdbox close-before-drain race" + or "runc gets this wrong". Prefer "the shim must deliver all buffered + output before the wait response is returned". + +3. **Keep test names implementation-neutral.** Test names appear in the + README table and in test output seen by all shim implementors. Names like + `FastExitOutput` are good; names like `RuncDrainRace` are not. + +## Code style + +- Follow standard Go conventions (`gofmt`, `goimports`). +- Test helpers shared between suites belong in `helpers.go`. +- Each test suite lives in its own `*_suite.go` file. +- The `testbin` package (`testbin/`) contains the guest-side multicall + binary; add new guest commands there when a test requires custom + in-container behaviour. diff --git a/vendor/github.com/dmcgowan/shimtest/LICENSE b/vendor/github.com/containerd/shimtest/LICENSE similarity index 100% rename from vendor/github.com/dmcgowan/shimtest/LICENSE rename to vendor/github.com/containerd/shimtest/LICENSE diff --git a/vendor/github.com/containerd/shimtest/MAINTAINERS b/vendor/github.com/containerd/shimtest/MAINTAINERS new file mode 100644 index 00000000..1371436e --- /dev/null +++ b/vendor/github.com/containerd/shimtest/MAINTAINERS @@ -0,0 +1,5 @@ +# shimtest maintainers +# +# shimtest is part of the nerdbox non-core sub-project of containerd. +# Maintainers are listed in the nerdbox MAINTAINERS file: +# https://github.com/containerd/nerdbox/blob/main/MAINTAINERS diff --git a/vendor/github.com/dmcgowan/shimtest/Makefile b/vendor/github.com/containerd/shimtest/Makefile similarity index 71% rename from vendor/github.com/dmcgowan/shimtest/Makefile rename to vendor/github.com/containerd/shimtest/Makefile index 60d8dba8..1bca9f4f 100644 --- a/vendor/github.com/dmcgowan/shimtest/Makefile +++ b/vendor/github.com/containerd/shimtest/Makefile @@ -1,3 +1,17 @@ +# Copyright The containerd Authors. + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + GO ?= go GO_BUILDTAGS ?= # On Windows the test binary must have the .exe extension to be executable. diff --git a/vendor/github.com/dmcgowan/shimtest/README.md b/vendor/github.com/containerd/shimtest/README.md similarity index 96% rename from vendor/github.com/dmcgowan/shimtest/README.md rename to vendor/github.com/containerd/shimtest/README.md index e5f3a6ee..c9084a6b 100644 --- a/vendor/github.com/dmcgowan/shimtest/README.md +++ b/vendor/github.com/containerd/shimtest/README.md @@ -5,6 +5,8 @@ lifecycle (create, start, exec, kill, delete), stdio round-trip, clock synchronization across a VM boundary, the transfer service, and UDS socket forwarding. +shimtest is part of the [nerdbox](https://github.com/containerd/nerdbox) non-core sub-project of [containerd](https://github.com/containerd). + ## Prerequisites - A built shim binary (e.g., `containerd-shim-runc-v2` or @@ -92,6 +94,7 @@ config, the tree is `TestShim//`. | `Lifecycle` | — | Full create/start/kill/wait/delete cycle | | `Exec` | exec | Exec a process inside a running container | | `StdioRoundTrip` | exec | Write to stdin, read from stdout via exec | +| `LargeStdioRoundTrip` | exec | Pipe 20 MiB through stdin→`cat`→stdout via exec; verify full byte count and CRC-32. Catches truncation in the exec stdio pipeline under sustained load | | `Clock` | exec | Verify VM clock is synchronized with host | | `ExitCodes` | exec | Exec processes that exit with a range of status codes and verify propagation | | `InitExitCodes` | — | Run the container's init process with `/bin/exit N` and verify task-level exit status propagation | @@ -131,7 +134,7 @@ Candidates to add later, ranked roughly by value: A subset of these benchmarks runs against `runc-rootless` and `nerdbox` on every push to `main` and is published as time-series charts at - (gh-pages). + (gh-pages). Benchmarks live under `BenchmarkShim//`. @@ -168,7 +171,7 @@ that points at your binary, then run `shimtest.test` against it. - name: Check out shimtest uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 with: - repository: dmcgowan/shimtest + repository: containerd/shimtest ref: # pin a commit path: shimtest diff --git a/vendor/github.com/dmcgowan/shimtest/config.go b/vendor/github.com/containerd/shimtest/config.go similarity index 100% rename from vendor/github.com/dmcgowan/shimtest/config.go rename to vendor/github.com/containerd/shimtest/config.go diff --git a/vendor/github.com/dmcgowan/shimtest/connect_unix.go b/vendor/github.com/containerd/shimtest/connect_unix.go similarity index 100% rename from vendor/github.com/dmcgowan/shimtest/connect_unix.go rename to vendor/github.com/containerd/shimtest/connect_unix.go diff --git a/vendor/github.com/dmcgowan/shimtest/connect_windows.go b/vendor/github.com/containerd/shimtest/connect_windows.go similarity index 100% rename from vendor/github.com/dmcgowan/shimtest/connect_windows.go rename to vendor/github.com/containerd/shimtest/connect_windows.go diff --git a/vendor/github.com/dmcgowan/shimtest/events.go b/vendor/github.com/containerd/shimtest/events.go similarity index 100% rename from vendor/github.com/dmcgowan/shimtest/events.go rename to vendor/github.com/containerd/shimtest/events.go diff --git a/vendor/github.com/dmcgowan/shimtest/exec_bench.go b/vendor/github.com/containerd/shimtest/exec_bench.go similarity index 100% rename from vendor/github.com/dmcgowan/shimtest/exec_bench.go rename to vendor/github.com/containerd/shimtest/exec_bench.go diff --git a/vendor/github.com/dmcgowan/shimtest/exec_suite.go b/vendor/github.com/containerd/shimtest/exec_suite.go similarity index 54% rename from vendor/github.com/dmcgowan/shimtest/exec_suite.go rename to vendor/github.com/containerd/shimtest/exec_suite.go index 15e6a279..346562fb 100644 --- a/vendor/github.com/dmcgowan/shimtest/exec_suite.go +++ b/vendor/github.com/containerd/shimtest/exec_suite.go @@ -31,6 +31,7 @@ import ( "time" taskAPI "github.com/containerd/containerd/api/runtime/task/v3" + tasktypes "github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/v2/pkg/namespaces" "github.com/containerd/ttrpc" typeurl "github.com/containerd/typeurl/v2" @@ -43,7 +44,6 @@ import ( // hashverify. type ExecSuite struct { cfg Config - } // NewExecSuite constructs an ExecSuite from the given options. @@ -57,11 +57,15 @@ func (s *ExecSuite) Run(t *testing.T) { registerShimLeakCheck(t, s.cfg.ShimBinary) t.Run("Exec", s.testExec) t.Run("StdioRoundTrip", s.testStdioRoundTrip) + t.Run("LargeStdioRoundTrip", s.testLargeStdioRoundTrip) t.Run("Clock", s.testClock) t.Run("ExitCodes", s.testExitCodes) t.Run("LargeFileRead", s.testLargeFileRead) t.Run("BindMountRead", s.testBindMountRead) t.Run("FastExitOutput", s.testFastExitOutput) + t.Run("ExecOutputDrainAfterExit", s.testExecOutputDrainAfterExit) + t.Run("ExecDiscardIO", s.testExecDiscardIO) + t.Run("ExecCommandNotFound", s.testExecCommandNotFound) } // TestExec runs `echo execworks` inside a container and verifies the @@ -559,6 +563,197 @@ func (s *ExecSuite) runHashverify(t *testing.T, path, hashHex string, extraMount shutdownTask(ctx, tc, cid) } +// testLargeStdioRoundTrip pipes 20 MiB through stdin → /bin/cat → +// stdout and verifies the full byte count and CRC-32 checksum on the +// way out. It uses the same deterministic 0x00..0xff tile payload as +// the FastExit tests. +// +// This test asserts that a conforming shim delivers all bytes written +// to exec stdin to the process and relays all bytes written by the +// process to stdout back to the host without truncation under sustained +// load. Two API contracts are verified: +// +// - All stdin bytes must be forwarded to the process before the stdin +// connection is closed. A shim that drops buffered bytes on the +// stdin close path will produce truncated output from cat. +// +// - All stdout bytes produced by the process must be delivered to the +// host before the stdout connection is closed. A shim that drops +// buffered bytes when the process exits will produce truncated +// output. +// +// Stdin is closed via the CloseIO RPC, not by simply closing the +// client-side FIFO write end. This matches how containerd itself signals +// EOF: the shim holds its own write-end reference on the stdin FIFO (to +// unblock its internal O_RDONLY open) and only releases it when it +// receives a CloseIO request. Closing the test's write end alone leaves +// the shim's reference open, so cat never sees EOF. The CloseIO RPC +// instructs the shim to drop its reference, delivering EOF to the +// process — exactly the protocol used by `ctr exec`. +// +// A third API contract is also verified: the shim must close the exec's +// stdout connection once the process exits and its output has been +// flushed, without waiting for the caller to issue Delete. The test +// waits for stdout EOF before calling Delete; a shim that defers the +// stdout close until Delete will time out here. +// +// Both failure modes are detected by the byte-count and CRC-32 +// assertions at the end of the test. +func (s *ExecSuite) testLargeStdioRoundTrip(t *testing.T) { + shimBin, bundleDir, rootfsMounts := shimSetup(t, s.cfg) + containerID := containerID(t) + + createOCISpec(t, bundleDir, []string{"/bin/forever"}, s.cfg) + + stdoutPath, stderrPath := createIOFifos(t, bundleDir) + ns := uniqueTestNamespace(t, "exec") + ctx := namespaces.WithNamespace(t.Context(), ns) + + params := startShim(t, shimBin, bundleDir, containerID, ns, s.cfg) + conn := connectShim(t, params.Address) + client := ttrpc.NewClient(conn) + defer client.Close() + + tc := taskAPI.NewTTRPCTaskClient(client) + + drainFifo(t, ctx, stdoutPath) + drainFifo(t, ctx, stderrPath) + + if _, err := tc.Create(ctx, newCreateTaskRequest(t, containerID, bundleDir, stdoutPath, stderrPath, rootfsMounts)); err != nil { + t.Fatal("create failed:", err) + } + if _, err := tc.Start(ctx, &taskAPI.StartRequest{ID: containerID}); err != nil { + t.Fatal("start failed:", err) + } + + execID := "large-rt" + execDir := t.TempDir() + execStdin, execStdout, execStderr := createStdioFifos(t, execDir) + + stdoutReader, err := openPipeReader(ctx, execStdout) + if err != nil { + t.Fatal("open stdout reader:", err) + } + t.Cleanup(func() { stdoutReader.Close() }) + + h := crc32.New(crc32.MakeTable(crc32.Castagnoli)) + var byteCount int64 + outDone := make(chan struct{}) + go func() { + defer close(outDone) + byteCount, _ = io.Copy(h, stdoutReader) + }() + + drainFifo(t, ctx, execStderr) + + stdinFifo, err := openPipeWriter(ctx, execStdin) + if err != nil { + t.Fatal("open stdin pipe:", err) + } + + procSpec, err := typeurl.MarshalAnyToProto(&specs.Process{ + Args: []string{"/bin/cat"}, + Cwd: "/", + Env: []string{"PATH=/bin:/usr/bin"}, + }) + if err != nil { + t.Fatal("marshal exec spec:", err) + } + + if _, err := tc.Exec(ctx, &taskAPI.ExecProcessRequest{ + ID: containerID, + ExecID: execID, + Spec: procSpec, + Stdin: execStdin, + Stdout: execStdout, + Stderr: execStderr, + }); err != nil { + t.Fatal("exec failed:", err) + } + if _, err := tc.Start(ctx, &taskAPI.StartRequest{ID: containerID, ExecID: execID}); err != nil { + t.Fatal("exec start failed:", err) + } + + // Write the tiled payload to stdin in a background goroutine. + // The write and the stdout drain run concurrently so that neither + // the stdin FIFO nor cat's stdout pipe can fill and deadlock. + writeDone := make(chan error, 1) + go func() { + _, err := io.Copy(stdinFifo, io.LimitReader(&infiniteTileReader{}, int64(largeStdioPayloadSize))) + stdinFifo.Close() + writeDone <- err + }() + + // Wait for all stdin to be written. + select { + case err := <-writeDone: + if err != nil { + t.Fatal("write to stdin failed:", err) + } + case <-time.After(90 * time.Second): + t.Fatal("timed out writing stdin payload") + } + + // Signal EOF to the in-container process via the CloseIO RPC. + // + // Closing the client-side FIFO write end alone is not sufficient: + // the shim holds its own write-end reference on the stdin FIFO + // (opened in openStdin to unblock the shim's internal O_RDONLY + // open) and releases it only when it receives a CloseIO request. + // This matches the protocol used by containerd and `ctr exec`: + // the client calls CloseIO after it is done writing stdin, which + // causes the shim to close its write reference and deliver EOF to + // the process. + if _, err := tc.CloseIO(ctx, &taskAPI.CloseIORequest{ + ID: containerID, + ExecID: execID, + Stdin: true, + }); err != nil { + t.Fatal("CloseIO failed:", err) + } + + // Wait for cat to exit (it exits once stdin reaches EOF). + waitResp, err := tc.Wait(ctx, &taskAPI.WaitRequest{ID: containerID, ExecID: execID}) + if err != nil { + t.Fatal("exec wait failed:", err) + } + if waitResp.ExitStatus != 0 { + t.Fatalf("cat exited with status %d", waitResp.ExitStatus) + } + + // Wait for the stdout reader to reach EOF before calling Delete. + // + // A conforming shim must close the exec's stdout connection once + // the process exits and its output has been flushed — it must not + // defer closing stdout until Delete is called. Waiting here before + // Delete tests that contract: if the shim closes stdout promptly on + // process exit, outDone fires quickly; if the shim holds stdout + // open until Delete, this will time out. + select { + case <-outDone: + case <-time.After(30 * time.Second): + t.Fatal("timed out waiting for stdout drain after process exit") + } + + if _, err := tc.Delete(ctx, &taskAPI.DeleteRequest{ID: containerID, ExecID: execID}); err != nil { + t.Fatal("exec delete failed:", err) + } + + if byteCount != int64(largeStdioPayloadSize) { + t.Fatalf("got %d bytes, want %d (truncated by %d bytes)", + byteCount, largeStdioPayloadSize, int64(largeStdioPayloadSize)-byteCount) + } + gotCRC := h.Sum32() + if wantCRC := tiledPayloadCRC32(largeStdioPayloadSize); gotCRC != wantCRC { + t.Fatalf("CRC mismatch: got %08x, want %08x (data corrupted)", gotCRC, wantCRC) + } + t.Logf("ok — %d bytes, CRC %08x", byteCount, gotCRC) + + tc.Kill(ctx, &taskAPI.KillRequest{ID: containerID, Signal: uint32(syscall.SIGKILL), All: true}) + tc.Wait(ctx, &taskAPI.WaitRequest{ID: containerID}) + tc.Delete(ctx, &taskAPI.DeleteRequest{ID: containerID}) + tc.Shutdown(ctx, &taskAPI.ShutdownRequest{ID: containerID}) +} // burstPayloadSize is the number of bytes written by /bin/burstexit in // the FastExitOutput and FastExitInit tests. It must be large enough @@ -572,17 +767,32 @@ func (s *ExecSuite) runHashverify(t *testing.T, path, hashHex string, extraMount // io.CopyBuffer. const burstPayloadSize = 8 * 1024 * 1024 -// burstExpectedCRC32 returns the Castagnoli CRC-32 of the deterministic -// byte stream produced by /bin/burstexit: a repeating 0x00..0xff tile -// of exactly burstPayloadSize bytes. -func burstExpectedCRC32() uint32 { - tile := make([]byte, 256) +// largeStdioPayloadSize is the number of bytes piped through the +// LargeStdioRoundTrip test. Larger than burstPayloadSize to stress the +// shim's in-flight buffering more aggressively. +const largeStdioPayloadSize = 20 * 1024 * 1024 + +// tiledPayload builds a repeating 0x00..0xff tiled payload of the given +// size. This matches the byte stream produced by /bin/burstexit and is +// shared between the exec and stress suite tests. +func tiledPayload(size int) []byte { + payload := make([]byte, size) + for i := range payload { + payload[i] = byte(i % 256) + } + return payload +} + +// tiledPayloadCRC32 returns the Castagnoli CRC-32 of a tiled payload of +// the given size without allocating the full payload. Used when only the +// expected checksum is needed (e.g. verifying /bin/burstexit output). +func tiledPayloadCRC32(size int) uint32 { + var tile [256]byte for i := range tile { tile[i] = byte(i) } h := crc32.New(crc32.MakeTable(crc32.Castagnoli)) - remaining := burstPayloadSize - for remaining > 0 { + for remaining := size; remaining > 0; { n := remaining if n > len(tile) { n = len(tile) @@ -593,6 +803,25 @@ func burstExpectedCRC32() uint32 { return h.Sum32() } +// burstExpectedCRC32 returns the Castagnoli CRC-32 of the deterministic +// tiled payload of exactly burstPayloadSize bytes. +func burstExpectedCRC32() uint32 { + return tiledPayloadCRC32(burstPayloadSize) +} + +// infiniteTileReader emits an infinite repeating 0x00..0xff tile stream. +// Wrap with io.LimitReader to produce a payload of exactly n bytes without +// allocating the full buffer. +type infiniteTileReader struct{ off int64 } + +func (r *infiniteTileReader) Read(p []byte) (int, error) { + for i := range p { + p[i] = byte((r.off + int64(i)) % 256) + } + r.off += int64(len(p)) + return len(p), nil +} + // testFastExitOutput execs /bin/burstexit (writes 8 MiB then exits) // inside a long-running container, then immediately shuts down the // shim without waiting for the exec to finish or calling Delete. @@ -710,3 +939,283 @@ func (s *ExecSuite) testFastExitOutput(t *testing.T) { } t.Logf("ok — %d bytes, CRC %08x", len(got), gotCRC) } + +// testExecOutputDrainAfterExit verifies that a conforming shim promptly +// closes the exec process's stdout connection when the process exits, +// allowing Shutdown to complete without waiting for the 30 s ioShutdown +// fallback. +// +// The init container is started with null (empty) stdout/stderr so that +// init's stdio shutdown completes instantly and does not consume the +// shared 30 s shutdown deadline. This isolates the exec's stdio close +// as the sole determinant of total Shutdown duration. +// +// A non-conforming shim that fails to propagate the write-end close of +// the exec's stdout back to the host will leave the host copy goroutine +// blocked until the 30 s fallback fires; Shutdown will exceed the 5 s +// assertion and the test will fail on the duration check. +func (s *ExecSuite) testExecOutputDrainAfterExit(t *testing.T) { + shimBin, bundleDir, rootfsMounts := shimSetup(t, s.cfg) + containerID := containerID(t) + + createOCISpec(t, bundleDir, []string{"/bin/forever"}, s.cfg) + + // Start the init container with null stdout/stderr so its stdio + // shutdown completes instantly. If we used real FIFOs, /bin/forever + // (which never writes) would hold stdout open, consuming the shared + // 30 s shutdown deadline before exec's ioShutdown even starts. + ns := uniqueTestNamespace(t, "exec") + ctx := namespaces.WithNamespace(t.Context(), ns) + + params := startShim(t, shimBin, bundleDir, containerID, ns, s.cfg) + conn := connectShim(t, params.Address) + client := ttrpc.NewClient(conn) + defer client.Close() + + tc := taskAPI.NewTTRPCTaskClient(client) + + if _, err := tc.Create(ctx, newCreateTaskRequest(t, containerID, bundleDir, "", "", rootfsMounts)); err != nil { + t.Fatal("create failed:", err) + } + if _, err := tc.Start(ctx, &taskAPI.StartRequest{ID: containerID}); err != nil { + t.Fatal("start failed:", err) + } + + execID := "drain-0" + execDir := t.TempDir() + execStdout, execStderr := createIOFifos(t, execDir) + + var execBuf bytes.Buffer + var execMu sync.Mutex + drainDone := drainFifoIntoDone(t, ctx, execStdout, &execBuf, &execMu) + drainFifo(t, ctx, execStderr) + + procSpec, err := typeurl.MarshalAnyToProto(&specs.Process{ + Args: []string{"/bin/burstexit", strconv.Itoa(burstPayloadSize), "0"}, + Cwd: "/", + Env: []string{"PATH=/bin:/usr/bin"}, + }) + if err != nil { + t.Fatal("marshal exec spec:", err) + } + + if _, err := tc.Exec(ctx, &taskAPI.ExecProcessRequest{ + ID: containerID, + ExecID: execID, + Spec: procSpec, + Stdout: execStdout, + Stderr: execStderr, + }); err != nil { + t.Fatal("exec failed:", err) + } + if _, err := tc.Start(ctx, &taskAPI.StartRequest{ID: containerID, ExecID: execID}); err != nil { + t.Fatal("exec start failed:", err) + } + + // Time the Shutdown call. With null init IO, the only remaining wait + // is exec's stdio shutdown — which is the variable under test. A + // conforming shim closes the exec's stdout connection promptly when + // the process exits; ioDone fires in milliseconds and Shutdown + // returns well within the 5 s bound. + shutdownStart := time.Now() + tc.Shutdown(ctx, &taskAPI.ShutdownRequest{ID: containerID}) + elapsed := time.Since(shutdownStart) + + // 5 s is well above sub-second propagation on a correct shim and well + // below the 30 s shared shutdown-context deadline. + const maxShutdownDuration = 5 * time.Second + if elapsed > maxShutdownDuration { + t.Fatalf("Shutdown blocked for %v (> %v): shim did not propagate exec stdout close — ioShutdown 30s fallback is masking the root cause", + elapsed.Round(time.Millisecond), maxShutdownDuration) + } + + select { + case <-drainDone: + case <-time.After(2 * time.Second): + t.Fatal("stdout FIFO did not drain within 2s after Shutdown returned") + } + + execMu.Lock() + got := execBuf.Bytes() + execMu.Unlock() + + wantCRC := burstExpectedCRC32() + if len(got) != burstPayloadSize { + t.Fatalf("got %d bytes, want %d (truncated by %d bytes)", + len(got), burstPayloadSize, burstPayloadSize-len(got)) + } + gotCRC := crc32.Checksum(got, crc32.MakeTable(crc32.Castagnoli)) + if gotCRC != wantCRC { + t.Fatalf("CRC mismatch: got %08x, want %08x (data corrupted)", gotCRC, wantCRC) + } + t.Logf("ok — %d bytes, CRC %08x", len(got), gotCRC) +} + +// testExecDiscardIO execs a short-lived process with all stdio discarded +// (empty Stdin/Stdout/Stderr), waits for it to exit, then Deletes the exec +// while the container's init keeps running. +func (s *ExecSuite) testExecDiscardIO(t *testing.T) { + shimBin, bundleDir, rootfsMounts := shimSetup(t, s.cfg) + containerID := containerID(t) + + createOCISpec(t, bundleDir, []string{"/bin/forever"}, s.cfg) + + stdoutPath, stderrPath := createIOFifos(t, bundleDir) + ns := uniqueTestNamespace(t, "exec") + ctx := namespaces.WithNamespace(t.Context(), ns) + + params := startShim(t, shimBin, bundleDir, containerID, ns, s.cfg) + conn := connectShim(t, params.Address) + client := ttrpc.NewClient(conn) + defer client.Close() + + tc := taskAPI.NewTTRPCTaskClient(client) + + drainFifo(t, ctx, stdoutPath) + drainFifo(t, ctx, stderrPath) + + if _, err := tc.Create(ctx, newCreateTaskRequest(t, containerID, bundleDir, stdoutPath, stderrPath, rootfsMounts)); err != nil { + t.Fatal("create failed:", err) + } + if _, err := tc.Start(ctx, &taskAPI.StartRequest{ID: containerID}); err != nil { + t.Fatal("start failed:", err) + } + + execID := "discard-io" + + procSpec, err := typeurl.MarshalAnyToProto(&specs.Process{ + Args: []string{"/bin/echo", "discardme"}, + Cwd: "/", + Env: []string{"PATH=/bin:/usr/bin"}, + }) + if err != nil { + t.Fatal("failed to marshal exec spec:", err) + } + + // Deliberately leave Stdin/Stdout/Stderr unset: the exec has no I/O to + // forward, so the shim's forwardIO returns a nil shutdown func. + if _, err := tc.Exec(ctx, &taskAPI.ExecProcessRequest{ + ID: containerID, + ExecID: execID, + Spec: procSpec, + }); err != nil { + t.Fatal("exec failed:", err) + } + if _, err := tc.Start(ctx, &taskAPI.StartRequest{ID: containerID, ExecID: execID}); err != nil { + t.Fatal("exec start failed:", err) + } + + waitResp, err := tc.Wait(ctx, &taskAPI.WaitRequest{ID: containerID, ExecID: execID}) + if err != nil { + t.Fatal("exec wait failed:", err) + } + t.Log("discard-io exec exit status:", waitResp.ExitStatus) + + if _, err := tc.Delete(ctx, &taskAPI.DeleteRequest{ID: containerID, ExecID: execID}); err != nil { + t.Fatal("exec delete failed (shim likely crashed on a nil IO shutdown):", err) + } + + stateResp, err := tc.State(ctx, &taskAPI.StateRequest{ID: containerID}) + if err != nil { + t.Fatal("container State after exec delete failed (shim likely crashed):", err) + } + if stateResp.Status != tasktypes.Status_RUNNING { + t.Fatalf("container is %s after deleting a discarded-IO exec; want RUNNING (shim likely crashed)", stateResp.Status) + } + + tc.Kill(ctx, &taskAPI.KillRequest{ID: containerID, Signal: uint32(syscall.SIGKILL), All: true}) + tc.Wait(ctx, &taskAPI.WaitRequest{ID: containerID}) + tc.Delete(ctx, &taskAPI.DeleteRequest{ID: containerID}) + shutdownTask(ctx, tc, containerID) +} + +// testExecCommandNotFound execs a binary that does not exist in the rootfs and +// verifies two things: the exec start is reported as a failure, and — +// critically — that deleting the failed exec returns promptly instead of +// blocking on the 30 s ioShutdown fallback. +// +// A conforming shim closes the exec's stream connections on the start-failure +// path, so the caller sees EOF immediately and Delete returns in milliseconds. +func (s *ExecSuite) testExecCommandNotFound(t *testing.T) { + shimBin, bundleDir, rootfsMounts := shimSetup(t, s.cfg) + containerID := containerID(t) + + createOCISpec(t, bundleDir, []string{"/bin/forever"}, s.cfg) + + stdoutPath, stderrPath := createIOFifos(t, bundleDir) + ns := uniqueTestNamespace(t, "exec") + ctx := namespaces.WithNamespace(t.Context(), ns) + + params := startShim(t, shimBin, bundleDir, containerID, ns, s.cfg) + conn := connectShim(t, params.Address) + client := ttrpc.NewClient(conn) + defer client.Close() + + tc := taskAPI.NewTTRPCTaskClient(client) + + drainFifo(t, ctx, stdoutPath) + drainFifo(t, ctx, stderrPath) + + if _, err := tc.Create(ctx, newCreateTaskRequest(t, containerID, bundleDir, stdoutPath, stderrPath, rootfsMounts)); err != nil { + t.Fatal("create failed:", err) + } + if _, err := tc.Start(ctx, &taskAPI.StartRequest{ID: containerID}); err != nil { + t.Fatal("start failed:", err) + } + + execID := "notfound-0" + + execStdout, execStderr := createIOFifos(t, t.TempDir()) + drainFifo(t, ctx, execStdout) + drainFifo(t, ctx, execStderr) + + procSpec, err := typeurl.MarshalAnyToProto(&specs.Process{ + Args: []string{"/bin/this-binary-does-not-exist"}, + Cwd: "/", + Env: []string{"PATH=/bin:/usr/bin"}, + }) + if err != nil { + t.Fatal("failed to marshal exec spec:", err) + } + + // Exec (create) succeeds: the shim sets up IO forwarding and registers the + // exec. The failure happens at Start, when the runtime tries to exec the + // missing binary. + if _, err := tc.Exec(ctx, &taskAPI.ExecProcessRequest{ + ID: containerID, + ExecID: execID, + Spec: procSpec, + Stdout: execStdout, + Stderr: execStderr, + }); err != nil { + t.Fatal("exec failed:", err) + } + + if _, err := tc.Start(ctx, &taskAPI.StartRequest{ID: containerID, ExecID: execID}); err == nil { + t.Fatal("expected exec start of a missing binary to fail, got nil error") + } else { + t.Log("exec start failed as expected:", err) + } + + // The regression assertion: deleting the failed exec must not block on the + // 30 s ioShutdown fallback. 5 s is far above sub-second cleanup on a + // conforming shim and far below the 30 s fallback. + const maxDeleteDuration = 5 * time.Second + deleteStart := time.Now() + _, delErr := tc.Delete(ctx, &taskAPI.DeleteRequest{ID: containerID, ExecID: execID}) + elapsed := time.Since(deleteStart) + if elapsed > maxDeleteDuration { + t.Fatalf("exec Delete blocked for %v (> %v): shim did not close the exec stream connections on the start-failure path — ioShutdown 30s fallback is masking the leak", + elapsed.Round(time.Millisecond), maxDeleteDuration) + } + // A Delete error is not the property under test (the exec already failed to + // start); only the absence of the 30 s stall is. Log it for context. + if delErr != nil { + t.Log("exec delete returned:", delErr) + } + + tc.Kill(ctx, &taskAPI.KillRequest{ID: containerID, Signal: uint32(syscall.SIGKILL), All: true}) + tc.Wait(ctx, &taskAPI.WaitRequest{ID: containerID}) + tc.Delete(ctx, &taskAPI.DeleteRequest{ID: containerID}) + shutdownTask(ctx, tc, containerID) +} diff --git a/vendor/github.com/dmcgowan/shimtest/helpers.go b/vendor/github.com/containerd/shimtest/helpers.go similarity index 98% rename from vendor/github.com/dmcgowan/shimtest/helpers.go rename to vendor/github.com/containerd/shimtest/helpers.go index 30edbcc9..86dadf75 100644 --- a/vendor/github.com/dmcgowan/shimtest/helpers.go +++ b/vendor/github.com/containerd/shimtest/helpers.go @@ -538,9 +538,9 @@ func shimPidViaConnect(address, id string, retryFor time.Duration) (int, error) cancel() client.Close() if callErr != nil { - lastErr = fmt.Errorf("Connect RPC: %w", callErr) + lastErr = fmt.Errorf("connect RPC: %w", callErr) } else if resp.ShimPid == 0 { - lastErr = fmt.Errorf("Connect returned ShimPid=0") + lastErr = fmt.Errorf("connect returned ShimPid=0") } else { return int(resp.ShimPid), nil } @@ -572,9 +572,9 @@ func unixSafeDir() string { return os.TempDir() } const ( - afUnixLimit = 104 // macOS AF_UNIX pathname length limit - socketNameLen = 64 // sha256 hex digest - dirOverhead = 14 // "/nb-" prefix + up to 10 random digits + afUnixLimit = 104 // macOS AF_UNIX pathname length limit + socketNameLen = 64 // sha256 hex digest + dirOverhead = 14 // "/nb-" prefix + up to 10 random digits maxBaseLen = afUnixLimit - socketNameLen - dirOverhead // 26 ) if len(os.TempDir()) > maxBaseLen { diff --git a/vendor/github.com/dmcgowan/shimtest/internal/transfer/transfer.go b/vendor/github.com/containerd/shimtest/internal/transfer/transfer.go similarity index 100% rename from vendor/github.com/dmcgowan/shimtest/internal/transfer/transfer.go rename to vendor/github.com/containerd/shimtest/internal/transfer/transfer.go diff --git a/vendor/github.com/dmcgowan/shimtest/io_unix.go b/vendor/github.com/containerd/shimtest/io_unix.go similarity index 89% rename from vendor/github.com/dmcgowan/shimtest/io_unix.go rename to vendor/github.com/containerd/shimtest/io_unix.go index 39970ead..919486b8 100644 --- a/vendor/github.com/dmcgowan/shimtest/io_unix.go +++ b/vendor/github.com/containerd/shimtest/io_unix.go @@ -163,6 +163,21 @@ func openPipeReader(ctx context.Context, path string) (io.ReadWriteCloser, error return fifo.OpenFifo(ctx, path, syscall.O_RDONLY|syscall.O_NONBLOCK, 0) } +// createRawPipeWriter creates a FIFO and returns its path, a WriteCloser +// for the write end (host → shim direction), and a cleanup function. Used +// in contexts without a testing.TB (e.g. runExecRoundTrip in stress_suite.go). +func createRawPipeWriter(dir, name string) (path string, w io.WriteCloser, cleanup func(), err error) { + path = filepath.Join(dir, name) + if err = syscall.Mkfifo(path, 0600); err != nil { + return "", nil, nil, fmt.Errorf("mkfifo %s: %w", path, err) + } + f, err := fifo.OpenFifo(context.Background(), path, syscall.O_WRONLY|syscall.O_NONBLOCK, 0) + if err != nil { + return "", nil, nil, fmt.Errorf("open fifo %s: %w", path, err) + } + return path, f, func() { f.Close() }, nil +} + // createRawPipe creates a FIFO and returns its path, a ReadCloser for // the read end, and a cleanup function. Used in contexts without a // testing.TB (e.g. runOneExec in stress_suite.go). diff --git a/vendor/github.com/dmcgowan/shimtest/io_windows.go b/vendor/github.com/containerd/shimtest/io_windows.go similarity index 75% rename from vendor/github.com/dmcgowan/shimtest/io_windows.go rename to vendor/github.com/containerd/shimtest/io_windows.go index 839a94dd..9a2863ac 100644 --- a/vendor/github.com/dmcgowan/shimtest/io_windows.go +++ b/vendor/github.com/containerd/shimtest/io_windows.go @@ -72,9 +72,22 @@ func pipeName() string { var pipeListeners sync.Map // map[string]net.Listener // pipeCfg is the pipe configuration used for all shimtest named pipes. -// nil uses the default named pipe ACL (current user has full access), -// which is what containerd itself passes to winio.ListenPipe. -var pipeCfg *winio.PipeConfig +// MessageMode is enabled so that the shim can use CloseWrite() (a +// zero-length message) to signal EOF without closing the handle, avoiding +// the Windows byte-mode pipe behaviour where closing the client handle +// discards any unread data in the server's read buffer. +// +// InputBufferSize and OutputBufferSize are set to 64 KiB so that the +// Windows kernel can buffer up to 16 × 4 KiB messages simultaneously. +// The default buffer (4 KiB) equals exactly one 4 KiB message, which +// forces the writer to block after each write until the reader issues +// the next ReadFile, serialising writes and reads and drastically +// reducing throughput. +var pipeCfg = &winio.PipeConfig{ + MessageMode: true, + InputBufferSize: 65536, + OutputBufferSize: 65536, +} // listenPipe creates a named-pipe server at path, stores the listener in // pipeListeners, and registers a cleanup on tb. @@ -147,6 +160,56 @@ func drainFifo(tb testing.TB, _ context.Context, path string) { }() } +// drainFifoIntoDone accepts one connection on the named pipe at path, copies +// all data into buf (protected by mu), and closes the returned channel when +// the connection is closed (i.e. when the write end is done). Use this +// instead of drainFifoInto when the caller needs to block until the pipe is +// fully drained before inspecting buf. +// +// Windows named-pipe note: when the write-end client calls Close(), a pending +// ReadFile on the server side may immediately return (0, ERROR_BROKEN_PIPE) +// even if the pipe buffer still contains unread bytes. A subsequent ReadFile +// WILL return those bytes. This function therefore issues a post-error drain +// loop to recover any residual data before closing the done channel. +func drainFifoIntoDone(tb testing.TB, _ context.Context, path string, buf *bytes.Buffer, mu *sync.Mutex) <-chan struct{} { + tb.Helper() + l := popListener(tb, path) + done := make(chan struct{}) + go func() { + defer close(done) + c, err := l.Accept() + if err != nil { + return + } + defer c.Close() + b := make([]byte, 4096) + for { + n, err := c.Read(b) + if n > 0 { + mu.Lock() + buf.Write(b[:n]) + mu.Unlock() + } + if err != nil { + // The write end was closed (or an error occurred). On Windows, + // the ReadFile that was pending when the client disconnected + // may return (0, error) while data remains in the pipe buffer. + // Drain any such residual bytes before exiting. + for { + n2, _ := c.Read(b) + if n2 == 0 { + return + } + mu.Lock() + buf.Write(b[:n2]) + mu.Unlock() + } + } + } + }() + return done +} + // drainFifoInto accepts one connection on the named pipe at path and copies // data into buf (protected by mu) in a background goroutine. func drainFifoInto(tb testing.TB, _ context.Context, path string, buf *bytes.Buffer, mu *sync.Mutex) { @@ -244,6 +307,33 @@ type rawPipeState struct { pw *io.PipeWriter } +// createRawPipeWriter creates a Windows named-pipe server and returns its +// path, a WriteCloser for the write end (host → shim direction), and a +// cleanup function. The shim connects as the client and reads from the pipe. +// Used in contexts without a testing.TB (e.g. runExecRoundTrip in stress_suite.go). +func createRawPipeWriter(_, name string) (path string, w io.WriteCloser, cleanup func(), err error) { + path = `\\.\pipe\shimtest-raw-` + name + `-` + randomSuffix() + l, err := winio.ListenPipe(path, pipeCfg) + if err != nil { + return "", nil, nil, fmt.Errorf("ListenPipe %s: %w", path, err) + } + pr, pw := io.Pipe() + go func() { + c, err := l.Accept() + if err != nil { + pr.CloseWithError(err) + return + } + defer c.Close() + io.Copy(c, pr) + }() + cleanup = func() { + l.Close() + pw.Close() + } + return path, pw, cleanup, nil +} + // createRawPipe creates a named-pipe server and returns its path, a // ReadCloser for the read end, and a cleanup function. Used in contexts // without a testing.TB (e.g. runOneExec in stress_suite.go). @@ -324,8 +414,8 @@ func dialPipeAsync(path string, timeout time.Duration) <-chan net.Conn { // the deferredPipeConnection pattern in containerd's shim_windows.go. type deferredPipeReader struct { ch <-chan net.Conn - conn net.Conn // set on first Read after dial completes - once sync.Once // ensures we receive from ch exactly once + conn net.Conn // set on first Read after dial completes + once sync.Once // ensures we receive from ch exactly once err error } diff --git a/vendor/github.com/dmcgowan/shimtest/layers_bench.go b/vendor/github.com/containerd/shimtest/layers_bench.go similarity index 100% rename from vendor/github.com/dmcgowan/shimtest/layers_bench.go rename to vendor/github.com/containerd/shimtest/layers_bench.go diff --git a/vendor/github.com/dmcgowan/shimtest/layers_suite.go b/vendor/github.com/containerd/shimtest/layers_suite.go similarity index 100% rename from vendor/github.com/dmcgowan/shimtest/layers_suite.go rename to vendor/github.com/containerd/shimtest/layers_suite.go diff --git a/vendor/github.com/dmcgowan/shimtest/oom_suite.go b/vendor/github.com/containerd/shimtest/oom_suite.go similarity index 99% rename from vendor/github.com/dmcgowan/shimtest/oom_suite.go rename to vendor/github.com/containerd/shimtest/oom_suite.go index b1b9075e..786b54b6 100644 --- a/vendor/github.com/dmcgowan/shimtest/oom_suite.go +++ b/vendor/github.com/containerd/shimtest/oom_suite.go @@ -30,7 +30,6 @@ import ( // can't reliably trigger the kernel OOM killer). type OOMSuite struct { cfg Config - } // NewOOMSuite constructs an OOMSuite from the given options. diff --git a/vendor/github.com/dmcgowan/shimtest/proc_linux.go b/vendor/github.com/containerd/shimtest/proc_linux.go similarity index 100% rename from vendor/github.com/dmcgowan/shimtest/proc_linux.go rename to vendor/github.com/containerd/shimtest/proc_linux.go diff --git a/vendor/github.com/dmcgowan/shimtest/proc_other.go b/vendor/github.com/containerd/shimtest/proc_other.go similarity index 100% rename from vendor/github.com/dmcgowan/shimtest/proc_other.go rename to vendor/github.com/containerd/shimtest/proc_other.go diff --git a/vendor/github.com/dmcgowan/shimtest/proc_windows.go b/vendor/github.com/containerd/shimtest/proc_windows.go similarity index 98% rename from vendor/github.com/dmcgowan/shimtest/proc_windows.go rename to vendor/github.com/containerd/shimtest/proc_windows.go index 0c4baa94..902034c5 100644 --- a/vendor/github.com/dmcgowan/shimtest/proc_windows.go +++ b/vendor/github.com/containerd/shimtest/proc_windows.go @@ -45,7 +45,7 @@ type processMemoryCounters struct { } var ( - psapi = windows.NewLazySystemDLL("psapi.dll") + psapi = windows.NewLazySystemDLL("psapi.dll") procGetProcessMemoryInfo = psapi.NewProc("GetProcessMemoryInfo") ) diff --git a/vendor/github.com/dmcgowan/shimtest/rootfs.go b/vendor/github.com/containerd/shimtest/rootfs.go similarity index 99% rename from vendor/github.com/dmcgowan/shimtest/rootfs.go rename to vendor/github.com/containerd/shimtest/rootfs.go index a527071b..f22eacb1 100644 --- a/vendor/github.com/dmcgowan/shimtest/rootfs.go +++ b/vendor/github.com/containerd/shimtest/rootfs.go @@ -36,7 +36,7 @@ import ( erofs "github.com/erofs/go-erofs" ) -const shimtestModulePath = "github.com/dmcgowan/shimtest" +const shimtestModulePath = "github.com/containerd/shimtest" // testbinOS is the OS for which testbin is always built. Regardless of the // host running the tests, the container binary is always a Linux ELF. @@ -390,7 +390,7 @@ func extractErofsIntoDir(tb testing.TB, imgPath, dir string) { if err != nil { return err } - src, err := img.(fs.FS).Open(path) + src, err := img.Open(path) if err != nil { return err } diff --git a/vendor/github.com/dmcgowan/shimtest/run_bench.go b/vendor/github.com/containerd/shimtest/run_bench.go similarity index 100% rename from vendor/github.com/dmcgowan/shimtest/run_bench.go rename to vendor/github.com/containerd/shimtest/run_bench.go diff --git a/vendor/github.com/dmcgowan/shimtest/run_suite.go b/vendor/github.com/containerd/shimtest/run_suite.go similarity index 100% rename from vendor/github.com/dmcgowan/shimtest/run_suite.go rename to vendor/github.com/containerd/shimtest/run_suite.go diff --git a/vendor/github.com/dmcgowan/shimtest/shimenv.go b/vendor/github.com/containerd/shimtest/shimenv.go similarity index 100% rename from vendor/github.com/dmcgowan/shimtest/shimenv.go rename to vendor/github.com/containerd/shimtest/shimenv.go diff --git a/vendor/github.com/dmcgowan/shimtest/stress_suite.go b/vendor/github.com/containerd/shimtest/stress_suite.go similarity index 72% rename from vendor/github.com/dmcgowan/shimtest/stress_suite.go rename to vendor/github.com/containerd/shimtest/stress_suite.go index 9feecaab..921ad6f6 100644 --- a/vendor/github.com/dmcgowan/shimtest/stress_suite.go +++ b/vendor/github.com/containerd/shimtest/stress_suite.go @@ -22,9 +22,12 @@ import ( "context" "errors" "fmt" + "hash/crc32" "io" + "math/rand/v2" "os" "runtime" + "strconv" "strings" "sync" "sync/atomic" @@ -39,7 +42,7 @@ import ( "github.com/opencontainers/runtime-spec/specs-go" "google.golang.org/protobuf/types/known/anypb" - "github.com/dmcgowan/shimtest/internal/transfer" + "github.com/containerd/shimtest/internal/transfer" ) // StressSuite contains long-running stress tests that exercise the @@ -60,6 +63,13 @@ type StressOptions struct { // Transfer enables the bidirectional transfer-service stress // test. The shim under test must implement the transfer service. Transfer bool + + // ExecRSSGrowthOverride, when non-zero, replaces the platform default + // RSS growth threshold for the exec stress test. Use this for shims + // that host a VM or other large runtime in-process and therefore have + // a higher expected one-time RSS step than a thin supervisor shim. + // The value is in bytes. + ExecRSSGrowthOverride int64 } // NewStressSuite constructs a StressSuite from cfg and options. @@ -195,15 +205,82 @@ func doFullLifecycle(t *testing.T, baseCtx context.Context, cfg Config, ttrpcClo // to complete before the next iteration starts. const stressExecConcurrency = 4 +// stressExecSeed is the fixed PRNG seed for the payload-size sequence +// used in the burstexit and round-trip exec stress variants. A fixed +// seed makes each test run produce the same size sequence, so failures +// are reproducible without any extra flags. +const stressExecSeed uint64 = 42 + +// stressExecMinSize and stressExecMaxSize bound the pseudo-random +// payload size (in bytes) chosen per iteration for the burstexit and +// round-trip exec stress variants. +const ( + stressExecMinSize = 32 * 1024 // 32 KiB + stressExecMaxSize = 32 * 1024 * 1024 // 32 MiB +) + +// stressExecKind identifies which exec variant a stress iteration runs. +// Iterations cycle through the kinds in declaration order so that every +// few iterations exercise a different part of the shim's exec pipeline. +type stressExecKind uint8 + +const ( + // stressExecKindEcho execs /bin/echo. No data-integrity check; + // verifies only that the process completes successfully. + stressExecKindEcho stressExecKind = iota + // stressExecKindBurstexit execs /bin/burstexit with a pseudo-random + // payload size. The process writes the tiled payload to stdout and + // exits immediately; the test verifies the full byte count and + // CRC-32. Exercises the shim's close-before-drain path. + stressExecKindBurstexit + // stressExecKindRoundTrip execs /bin/cat with a pseudo-random + // payload piped via stdin and read back from stdout. Verifies byte + // count and CRC-32. Exercises both stdin delivery and stdout drain. + stressExecKindRoundTrip + stressExecKindCount +) + // stressMaxRSSGrowth is the upper bound on shim RSS growth between // the start and end of the exec stress run. Crossing this threshold // indicates an unbounded leak in the shim's per-exec bookkeeping. // -// # Linux (64 MiB) +// # Linux (384 MiB) +// +// Shims that host a VM in-process exhibit a large one-time RSS step +// from VM initialization — vCPU state, virtio device buffers, +// guest RAM, Go runtime heap watermark — that saturates early and +// does not grow linearly with exec count. +// Observed nerdbox data over a 19-minute / ~6000-iter run: // -// The runc shim is a thin supervisor; it fork-execs runc per task and -// quickly exits the child. Its working set stays small. Any per-exec -// retention beyond 64 MiB is a genuine leak. +// RSS after 30s / ~150 iters: +181 MiB +// RSS after 60s / ~325 iters: +187 MiB ← most growth is here +// RSS after 11m / ~3500 iters: +194 MiB +// RSS after 19m / ~6000 iters: +204 MiB ← saturated +// +// Growth from 60s to 19 minutes is only +17 MiB despite 18× more +// iterations; the per-iteration rate drops from ~570 KiB at 30s to +// ~3 KiB at steady state — a clear saturation signature, not a leak. +// +// 384 MiB = ~1.9× the observed 19-min peak (~204 MiB), providing +// enough headroom for CI variance while still catching a genuine +// per-exec leak: at the observed steady-state rate of ~3 KiB/iter a +// true linear leak would cross the threshold after ~60 000 iterations +// (~45 minutes at 22 iter/s), well within a standard CI run window. +// +// Thin supervisor shims (runc-style) on Linux have negligible in-process +// overhead; any retained per-exec state beyond a few MiB is a leak. +// The 384 MiB ceiling gives such shims a 384 MiB budget to detect +// leaks, which is more than sufficient. Use ExecRSSGrowthOverride in +// StressOptions to set a tighter bound if desired. +// +// # macOS (128 MiB) +// +// One-time pool allocations (Go runtime heap watermark, HVF VM state, +// virtio device buffers) produce an expected RSS step of ~100 MiB that +// saturates early in the run rather than growing linearly with exec +// count. 128 MiB = ~1.3× the observed peak step, large enough to +// absorb OS page-cache fluctuation while still catching a true +// per-exec leak at the observed rate of ~5 KB/exec within 10 minutes. // // # Windows (384 MiB) // @@ -231,10 +308,14 @@ const stressExecConcurrency = 4 // before the threshold is crossed, while giving headroom for the // one-time pool growth. var stressMaxRSSGrowth = func() int64 { - if runtime.GOOS == "windows" { + switch runtime.GOOS { + case "windows": return 384 << 20 // 384 MiB — see comment above + case "darwin": + return 128 << 20 // 128 MiB — see comment above + default: + return 384 << 20 // 384 MiB — see comment above (covers in-process VM shims) } - return 64 << 20 // 64 MiB — Linux runc shim is lightweight }() // stressGuestMemSampleInterval is how often the exec stress test @@ -272,7 +353,7 @@ func (s *StressSuite) testExec(t *testing.T) { t.Logf("guest memory sampling unavailable: %v", guestBeforeErr) } - procSpec, err := typeurl.MarshalAnyToProto(&specs.Process{ + echoSpec, err := typeurl.MarshalAnyToProto(&specs.Process{ Args: []string{"/bin/echo", "execstress"}, Cwd: "/", Env: []string{"PATH=/bin:/usr/bin"}, @@ -281,6 +362,14 @@ func (s *StressSuite) testExec(t *testing.T) { t.Fatal("marshal exec spec:", err) } + // Seeded PRNG for payload sizes. Sizes are computed in the main + // goroutine before each iteration's goroutines are spawned so the + // sequence is deterministic regardless of goroutine scheduling. + rng := rand.New(rand.NewPCG(stressExecSeed, 0)) + nextSize := func() int { + return stressExecMinSize + rng.IntN(stressExecMaxSize-stressExecMinSize+1) + } + ctx, cancel := stressCtx(t, env.ctx) defer cancel() @@ -317,21 +406,33 @@ func (s *StressSuite) testExec(t *testing.T) { } }() + numKinds := int64(stressExecKindCount) var iterIdx atomic.Int64 iters, elapsed, runErr := runStress(ctx, func(ctx context.Context) error { i := iterIdx.Add(1) + kind := stressExecKind((i - 1) % numKinds) + var wg sync.WaitGroup var firstErr atomic.Pointer[error] for j := 0; j < stressExecConcurrency; j++ { wg.Add(1) - go func(j int) { + go func(j, size int) { defer wg.Done() execID := fmt.Sprintf("e-%d-%d", i, j) - if err := runOneExec(ctx, env, execID, procSpec); err != nil { + var err error + switch kind { + case stressExecKindEcho: + err = runOneExec(ctx, env, execID, echoSpec) + case stressExecKindBurstexit: + err = runExecBurstexit(ctx, env, execID, size) + case stressExecKindRoundTrip: + err = runExecRoundTrip(ctx, env, execID, size) + } + if err != nil { e := err firstErr.CompareAndSwap(nil, &e) } - }(j) + }(j, nextSize()) } wg.Wait() if e := firstErr.Load(); e != nil { @@ -346,7 +447,7 @@ func (s *StressSuite) testExec(t *testing.T) { guestAfter, guestAfterErr := readGuestMem(env.ctx, env, nextGuestMemID()) rate := float64(iters*stressExecConcurrency) / elapsed.Seconds() - t.Logf("exec: %d iterations × %d execs in %s (%.0f exec/s); host rss %d → %d (Δ %+d)", + t.Logf("exec: %d iterations × %d execs in %s (%.0f exec/s); kinds: echo/burstexit/roundtrip cycling; host rss %d → %d (Δ %+d)", iters, stressExecConcurrency, elapsed.Round(time.Millisecond), rate, rssBefore, rssAfter, rssAfter-rssBefore) @@ -372,9 +473,13 @@ func (s *StressSuite) testExec(t *testing.T) { if runErr != nil { t.Fatalf("exec stress: %v", runErr) } - if growth := rssAfter - rssBefore; growth > stressMaxRSSGrowth { + rssThreshold := stressMaxRSSGrowth + if s.options.ExecRSSGrowthOverride > 0 { + rssThreshold = s.options.ExecRSSGrowthOverride + } + if growth := rssAfter - rssBefore; growth > rssThreshold { t.Errorf("host shim RSS grew %d bytes (threshold %d) during exec stress", - growth, stressMaxRSSGrowth) + growth, rssThreshold) } } @@ -491,6 +596,205 @@ func captureExec(parentCtx context.Context, env *shimEnv, execID string, procSpe return outBuf.String(), nil } +// runExecBurstexit runs a single /bin/burstexit exec inside the shared +// container, captures its stdout, and verifies the full byte count and +// CRC-32 of the tiled payload. size is the number of bytes to request. +func runExecBurstexit(parentCtx context.Context, env *shimEnv, execID string, size int) error { + subCtx, cancel := context.WithTimeout(parentCtx, stressIterationTimeout) + defer cancel() + + dir, err := os.MkdirTemp("", "stress-burst-") + if err != nil { + return fmt.Errorf("mkdtemp: %w", err) + } + defer os.RemoveAll(dir) + + stdoutPath, stdout, cleanupStdout, err := createRawPipe(dir, "stdout") + if err != nil { + return fmt.Errorf("create stdout pipe: %w", err) + } + defer cleanupStdout() + + stderrPath, stderr, cleanupStderr, err := createRawPipe(dir, "stderr") + if err != nil { + return fmt.Errorf("create stderr pipe: %w", err) + } + defer cleanupStderr() + + go io.Copy(io.Discard, stderr) + + h := crc32.New(crc32.MakeTable(crc32.Castagnoli)) + var ( + byteCount int64 + gotCRC uint32 + ) + outDone := make(chan error, 1) + go func() { + n, err := io.Copy(h, stdout) + byteCount = n + gotCRC = h.Sum32() + outDone <- err + }() + + procSpec, err := typeurl.MarshalAnyToProto(&specs.Process{ + Args: []string{"/bin/burstexit", strconv.Itoa(size), "0"}, + Cwd: "/", + Env: []string{"PATH=/bin:/usr/bin"}, + }) + if err != nil { + return fmt.Errorf("marshal exec spec: %w", err) + } + + if _, err := env.tc.Exec(subCtx, &taskAPI.ExecProcessRequest{ + ID: env.containerID, + ExecID: execID, + Spec: procSpec, + Stdout: stdoutPath, + Stderr: stderrPath, + }); err != nil { + return fmt.Errorf("exec: %w", err) + } + if _, err := env.tc.Start(subCtx, &taskAPI.StartRequest{ID: env.containerID, ExecID: execID}); err != nil { + return fmt.Errorf("start exec: %w", err) + } + if _, err := env.tc.Wait(subCtx, &taskAPI.WaitRequest{ID: env.containerID, ExecID: execID}); err != nil { + return fmt.Errorf("wait: %w", err) + } + if _, err := env.tc.Delete(subCtx, &taskAPI.DeleteRequest{ID: env.containerID, ExecID: execID}); err != nil { + return fmt.Errorf("delete exec: %w", err) + } + + select { + case readErr := <-outDone: + if readErr != nil { + return fmt.Errorf("stdout read: %w", readErr) + } + case <-subCtx.Done(): + return fmt.Errorf("stdout drain timed out after Delete: %w", subCtx.Err()) + } + + if byteCount != int64(size) { + return fmt.Errorf("stdout truncated: got %d bytes, want %d", byteCount, size) + } + if wantCRC := tiledPayloadCRC32(size); gotCRC != wantCRC { + return fmt.Errorf("stdout CRC mismatch: got %08x, want %08x (data corrupted)", gotCRC, wantCRC) + } + return nil +} + +// runExecRoundTrip runs a single /bin/cat exec inside the shared +// container, writes size bytes of tiled payload to its stdin, and +// verifies that all bytes are echoed back on stdout with the correct +// CRC-32. Exercises both stdin delivery and stdout drain. +func runExecRoundTrip(parentCtx context.Context, env *shimEnv, execID string, size int) error { + subCtx, cancel := context.WithTimeout(parentCtx, stressIterationTimeout) + defer cancel() + + dir, err := os.MkdirTemp("", "stress-rt-") + if err != nil { + return fmt.Errorf("mkdtemp: %w", err) + } + defer os.RemoveAll(dir) + + stdinPath, stdin, cleanupStdin, err := createRawPipeWriter(dir, "stdin") + if err != nil { + return fmt.Errorf("create stdin pipe: %w", err) + } + defer cleanupStdin() + + stdoutPath, stdout, cleanupStdout, err := createRawPipe(dir, "stdout") + if err != nil { + return fmt.Errorf("create stdout pipe: %w", err) + } + defer cleanupStdout() + + stderrPath, stderr, cleanupStderr, err := createRawPipe(dir, "stderr") + if err != nil { + return fmt.Errorf("create stderr pipe: %w", err) + } + defer cleanupStderr() + + go io.Copy(io.Discard, stderr) + + h := crc32.New(crc32.MakeTable(crc32.Castagnoli)) + var ( + byteCount int64 + gotCRC uint32 + ) + outDone := make(chan error, 1) + go func() { + n, err := io.Copy(h, stdout) + byteCount = n + gotCRC = h.Sum32() + outDone <- err + }() + + procSpec, err := typeurl.MarshalAnyToProto(&specs.Process{ + Args: []string{"/bin/cat"}, + Cwd: "/", + Env: []string{"PATH=/bin:/usr/bin"}, + }) + if err != nil { + return fmt.Errorf("marshal exec spec: %w", err) + } + + if _, err := env.tc.Exec(subCtx, &taskAPI.ExecProcessRequest{ + ID: env.containerID, + ExecID: execID, + Spec: procSpec, + Stdin: stdinPath, + Stdout: stdoutPath, + Stderr: stderrPath, + }); err != nil { + return fmt.Errorf("exec: %w", err) + } + if _, err := env.tc.Start(subCtx, &taskAPI.StartRequest{ID: env.containerID, ExecID: execID}); err != nil { + return fmt.Errorf("start exec: %w", err) + } + + // Stream the tiled payload to stdin without allocating the full + // buffer; closing stdin signals EOF to /bin/cat, causing it to exit. + writeDone := make(chan error, 1) + go func() { + _, err := io.Copy(stdin, io.LimitReader(&infiniteTileReader{}, int64(size))) + stdin.Close() + writeDone <- err + }() + + select { + case err := <-writeDone: + if err != nil { + return fmt.Errorf("write stdin: %w", err) + } + case <-subCtx.Done(): + return fmt.Errorf("stdin write timed out: %w", subCtx.Err()) + } + + if _, err := env.tc.Wait(subCtx, &taskAPI.WaitRequest{ID: env.containerID, ExecID: execID}); err != nil { + return fmt.Errorf("wait: %w", err) + } + if _, err := env.tc.Delete(subCtx, &taskAPI.DeleteRequest{ID: env.containerID, ExecID: execID}); err != nil { + return fmt.Errorf("delete exec: %w", err) + } + + select { + case readErr := <-outDone: + if readErr != nil { + return fmt.Errorf("stdout read: %w", readErr) + } + case <-subCtx.Done(): + return fmt.Errorf("stdout drain timed out after Delete: %w", subCtx.Err()) + } + + if byteCount != int64(size) { + return fmt.Errorf("stdout truncated: got %d bytes, want %d", byteCount, size) + } + if wantCRC := tiledPayloadCRC32(size); gotCRC != wantCRC { + return fmt.Errorf("stdout CRC mismatch: got %08x, want %08x (data corrupted)", gotCRC, wantCRC) + } + return nil +} + // readGuestMem execs /bin/cat /proc/meminfo inside the container and // returns used memory in bytes (MemTotal - MemAvailable). execID must // be unique across all concurrent execs on the same container. diff --git a/vendor/github.com/dmcgowan/shimtest/testbin/testbin.go b/vendor/github.com/containerd/shimtest/testbin/testbin.go similarity index 87% rename from vendor/github.com/dmcgowan/shimtest/testbin/testbin.go rename to vendor/github.com/containerd/shimtest/testbin/testbin.go index 7b3a2ae0..f1b93e28 100644 --- a/vendor/github.com/dmcgowan/shimtest/testbin/testbin.go +++ b/vendor/github.com/containerd/shimtest/testbin/testbin.go @@ -1,3 +1,19 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + // Package testbin implements the multicall container binary used by shimtest // suites. It is intentionally stdlib-only so that it can be compiled as a // fully static linux binary with CGO_ENABLED=0 and no external dependencies. @@ -6,7 +22,7 @@ // // package main // -// import "github.com/dmcgowan/shimtest/internal/testbin" +// import "github.com/containerd/shimtest/internal/testbin" // // func main() { testbin.Main() } package testbin @@ -378,18 +394,31 @@ func cmdTickexit(_ []string) { os.Exit(7) } +// infiniteTileReader emits an infinite repeating 0x00..0xff tile stream. +// Wrap with io.LimitReader to produce a deterministic payload of exactly +// n bytes without allocating the full buffer. +type infiniteTileReader struct{ off int64 } + +func (r *infiniteTileReader) Read(p []byte) (int, error) { + for i := range p { + p[i] = byte((r.off + int64(i)) % 256) + } + r.off += int64(len(p)) + return len(p), nil +} + // cmdBurstexit writes a deterministic byte stream of the requested // size to stdout as fast as possible, then exits immediately. The -// stream is a repeating sequence of bytes 0x00..0xff so that any -// truncation or corruption is detectable by length or content checks. +// stream is a repeating 0x00..0xff tile so that any truncation or +// corruption is detectable by length or CRC-32 checks. // // Usage: burstexit [exit_code] // // This is used by the FastExitOutput and FastExitInit tests to expose // the close-before-drain race in the shim's IO cleanup path: the -// process exits while bytes are still in-flight through the vsock copy -// goroutines, and a shim that closes the stream connections before -// waiting for the goroutines to drain will truncate the output. +// process exits while bytes are still in-flight, and a shim that +// closes stream connections before the goroutines drain will truncate +// the output. func cmdBurstexit(args []string) { if len(args) < 2 { fmt.Fprintln(os.Stderr, "usage: burstexit [exit_code]") @@ -409,25 +438,8 @@ func cmdBurstexit(args []string) { } } - // Write in 32 KiB chunks to avoid a single enormous syscall while - // still producing output much faster than the vsock copy goroutine - // can drain it. - const chunkSize = 32 * 1024 - chunk := make([]byte, chunkSize) - for i := 0; i < chunkSize; i++ { - chunk[i] = byte(i % 256) - } - - remaining := size - for remaining > 0 { - n := remaining - if n > chunkSize { - n = chunkSize - } - if _, err := os.Stdout.Write(chunk[:n]); err != nil { - os.Exit(1) - } - remaining -= n + if _, err := io.Copy(os.Stdout, io.LimitReader(&infiniteTileReader{}, int64(size))); err != nil { + os.Exit(1) } os.Exit(exitCode) } diff --git a/vendor/github.com/dmcgowan/shimtest/transfer_io.go b/vendor/github.com/containerd/shimtest/transfer_io.go similarity index 100% rename from vendor/github.com/dmcgowan/shimtest/transfer_io.go rename to vendor/github.com/containerd/shimtest/transfer_io.go diff --git a/vendor/github.com/dmcgowan/shimtest/transfer_suite.go b/vendor/github.com/containerd/shimtest/transfer_suite.go similarity index 99% rename from vendor/github.com/dmcgowan/shimtest/transfer_suite.go rename to vendor/github.com/containerd/shimtest/transfer_suite.go index 04ca9fd0..3b6f751c 100644 --- a/vendor/github.com/dmcgowan/shimtest/transfer_suite.go +++ b/vendor/github.com/containerd/shimtest/transfer_suite.go @@ -32,7 +32,7 @@ import ( "github.com/containerd/typeurl/v2" "github.com/opencontainers/runtime-spec/specs-go" - "github.com/dmcgowan/shimtest/internal/transfer" + "github.com/containerd/shimtest/internal/transfer" ) // TransferSuite contains tests gated on the "transfer" feature: the diff --git a/vendor/github.com/dmcgowan/shimtest/uds_bench.go b/vendor/github.com/containerd/shimtest/uds_bench.go similarity index 100% rename from vendor/github.com/dmcgowan/shimtest/uds_bench.go rename to vendor/github.com/containerd/shimtest/uds_bench.go diff --git a/vendor/github.com/dmcgowan/shimtest/uds_suite.go b/vendor/github.com/containerd/shimtest/uds_suite.go similarity index 99% rename from vendor/github.com/dmcgowan/shimtest/uds_suite.go rename to vendor/github.com/containerd/shimtest/uds_suite.go index 9c5bfccb..8e25515c 100644 --- a/vendor/github.com/dmcgowan/shimtest/uds_suite.go +++ b/vendor/github.com/containerd/shimtest/uds_suite.go @@ -36,7 +36,6 @@ import ( // UDSSuite contains the UDS-mount tests, gated on the "uds" feature. type UDSSuite struct { cfg Config - } // NewUDSSuite constructs a UDSSuite from the given options. diff --git a/vendor/github.com/dmcgowan/shimtest/version.go b/vendor/github.com/containerd/shimtest/version.go similarity index 100% rename from vendor/github.com/dmcgowan/shimtest/version.go rename to vendor/github.com/containerd/shimtest/version.go diff --git a/vendor/modules.txt b/vendor/modules.txt index db067b7a..009f6e0a 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -145,6 +145,11 @@ github.com/containerd/platforms ## explicit; go 1.22 github.com/containerd/plugin github.com/containerd/plugin/registry +# github.com/containerd/shimtest v0.2.1 +## explicit; go 1.26.3 +github.com/containerd/shimtest +github.com/containerd/shimtest/internal/transfer +github.com/containerd/shimtest/testbin # github.com/containerd/ttrpc v1.2.9-0.20260501231634-6c2eed2b612e ## explicit; go 1.23 github.com/containerd/ttrpc @@ -157,11 +162,6 @@ github.com/coreos/go-systemd/v22/dbus # github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc ## explicit github.com/davecgh/go-spew/spew -# github.com/dmcgowan/shimtest v0.1.8 -## explicit; go 1.26.3 -github.com/dmcgowan/shimtest -github.com/dmcgowan/shimtest/internal/transfer -github.com/dmcgowan/shimtest/testbin # github.com/docker/go-events v0.0.0-20190806004212-e31b211e4f1c ## explicit github.com/docker/go-events