diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 33b6722433..ca16c9d67e 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -38,65 +38,29 @@ jobs: environment: [py310, py311, py312, py313, py314] task: [test-ci] # Cherry-pick test modules to split the overall runtime roughly in half - partition: [ci1, not ci1] + partition: [ci1] include: # MacOS CI does not have many hosts available; run it on 3.14 only - os: macos-latest environment: py314 task: test-ci partition: ci1 - - os: macos-latest - environment: py314 - task: test-ci - partition: not ci1 # Minimum dependencies - os: ubuntu-latest environment: mindeps task: test-ci partition: ci1 - - os: ubuntu-latest - environment: mindeps - task: test-ci - partition: not ci1 - os: ubuntu-latest environment: mindeps-array task: test-ci partition: ci1 - - os: ubuntu-latest - environment: mindeps-array - task: test-ci - partition: not ci1 - os: ubuntu-latest environment: mindeps-dataframe task: test-ci partition: ci1 - - os: ubuntu-latest - environment: mindeps-dataframe - task: test-ci - partition: not ci1 - - # Set distributed.scheduler.worker-saturation: .inf - - os: ubuntu-latest - environment: py310 - task: test-noqueue - partition: ci1 - - os: ubuntu-latest - environment: py310 - task: test-noqueue - partition: not ci1 - - # Free-threading (WIP - tests don't pass yet) - # - os: ubuntu-latest - # environment: py314t - # task: test-ci - # partition: ci1 - # - os: ubuntu-latest - # environment: py314t - # task: test-ci - # partition: not ci1 steps: - name: Checkout source @@ -150,8 +114,7 @@ jobs: set -o pipefail mkdir reports - pixi run -e ${{ matrix.environment }} ${{ matrix.task }} \ - -m "${{ matrix.partition }}" \ + pixi run -e ${{ matrix.environment }} ${{ matrix.task }} distributed/tests/test_stress.py::test_chaos_rechunk --count 100 \ | tee pytest-stdout.log - name: Generate or post-process coverage.xml and pytest.xml diff --git a/distributed/nanny.py b/distributed/nanny.py index 3a596589e2..392884e175 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -585,20 +585,24 @@ async def close( # type: ignore[override] self.status = Status.closing logger.info("Closing Nanny at %r. Reason: %s", self.address_safe, reason) - await self.preloads.teardown() - - await asyncio.gather(*(self.plugin_remove(name) for name in self.plugins)) + try: + await self.preloads.teardown() - self.stop() - if self.process is not None: - await self.kill(timeout=timeout, reason=reason) + await asyncio.gather(*(self.plugin_remove(name) for name in self.plugins)) - self.process = None - await self.rpc.close() - self.status = Status.closed - await super().close() - self.__exit_stack.__exit__(None, None, None) - logger.info("Nanny at %r closed.", self.address_safe) + self.stop() + if self.process is not None: + await self.kill(timeout=timeout, reason=reason) + finally: + # Whatever happens, e.g. a timeout while killing the worker process, + # the nanny must reach a terminal status; otherwise concurrent and + # future calls to close() would wait forever on self.finished(). + self.process = None + self.status = Status.closed + await self.rpc.close() + await super().close() + self.__exit_stack.__exit__(None, None, None) + logger.info("Nanny at %r closed.", self.address_safe) return "OK" async def _log_event(self, topic, msg): @@ -802,15 +806,14 @@ async def kill( ) -> None: """ Ensure the worker process is stopped, waiting at most - ``timeout * 0.8`` seconds before killing it abruptly. + ``timeout * 0.8`` seconds before killing it abruptly; after that, wait + up to ``timeout`` more seconds for the killed process to be joined. When `kill` returns, the worker process has been joined. - If the worker process does not terminate within ``timeout`` seconds, - even after being killed, `asyncio.TimeoutError` is raised. + If the worker process does not terminate within ``timeout * 1.8`` + seconds, even after being killed, `asyncio.TimeoutError` is raised. """ - deadline = time() + timeout - # If the process is not properly up it will not watch the closing queue # and we may end up leaking this process # Therefore wait for it to be properly started before killing it @@ -855,7 +858,12 @@ async def kill( f"Worker process still alive after {wait_timeout:.1f} seconds, killing" ) await process.kill() - await process.join(max(0, deadline - time())) + # A killed process can't linger except in exceptional circumstances, + # e.g. it's hanging in uninterruptible I/O. However, joining it may + # take a substantial amount of time when the host is heavily loaded, + # so wait with a fresh timeout rather than with whatever is left of + # the original one. + await process.join(timeout) except ValueError as e: if "invalid operation on closed AsyncProcess" in str(e): return diff --git a/distributed/tests/test_stress.py b/distributed/tests/test_stress.py index 794fc530a4..d4b8b84701 100644 --- a/distributed/tests/test_stress.py +++ b/distributed/tests/test_stress.py @@ -310,6 +310,9 @@ async def test_no_delay_during_large_transfer(c, s, w): client=True, Worker=Nanny, nthreads=[("", 2)] * 6, + # On heavily loaded CI hosts, starting the cluster can take longer than the + # whole default budget, while the test body by design runs for 10+ seconds + timeout=120, scheduler_kwargs={"transition_counter_max": 500_000}, worker_kwargs={"transition_counter_max": 500_000}, )