Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 2 additions & 39 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,65 +38,29 @@ jobs:
environment: [py310, py311, py312, py313, py314]
task: [test-ci]
# Cherry-pick test modules to split the overall runtime roughly in half
partition: [ci1, not ci1]
partition: [ci1]
include:
# MacOS CI does not have many hosts available; run it on 3.14 only
- os: macos-latest
environment: py314
task: test-ci
partition: ci1
- os: macos-latest
environment: py314
task: test-ci
partition: not ci1

# Minimum dependencies
- os: ubuntu-latest
environment: mindeps
task: test-ci
partition: ci1
- os: ubuntu-latest
environment: mindeps
task: test-ci
partition: not ci1

- os: ubuntu-latest
environment: mindeps-array
task: test-ci
partition: ci1
- os: ubuntu-latest
environment: mindeps-array
task: test-ci
partition: not ci1

- os: ubuntu-latest
environment: mindeps-dataframe
task: test-ci
partition: ci1
- os: ubuntu-latest
environment: mindeps-dataframe
task: test-ci
partition: not ci1

# Set distributed.scheduler.worker-saturation: .inf
- os: ubuntu-latest
environment: py310
task: test-noqueue
partition: ci1
- os: ubuntu-latest
environment: py310
task: test-noqueue
partition: not ci1

# Free-threading (WIP - tests don't pass yet)
# - os: ubuntu-latest
# environment: py314t
# task: test-ci
# partition: ci1
# - os: ubuntu-latest
# environment: py314t
# task: test-ci
# partition: not ci1

steps:
- name: Checkout source
Expand Down Expand Up @@ -150,8 +114,7 @@ jobs:
set -o pipefail
mkdir reports

pixi run -e ${{ matrix.environment }} ${{ matrix.task }} \
-m "${{ matrix.partition }}" \
pixi run -e ${{ matrix.environment }} ${{ matrix.task }} distributed/tests/test_stress.py::test_chaos_rechunk --count 100 \
| tee pytest-stdout.log

- name: Generate or post-process coverage.xml and pytest.xml
Expand Down
44 changes: 26 additions & 18 deletions distributed/nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -585,20 +585,24 @@ async def close( # type: ignore[override]
self.status = Status.closing
logger.info("Closing Nanny at %r. Reason: %s", self.address_safe, reason)

await self.preloads.teardown()

await asyncio.gather(*(self.plugin_remove(name) for name in self.plugins))
try:
await self.preloads.teardown()

self.stop()
if self.process is not None:
await self.kill(timeout=timeout, reason=reason)
await asyncio.gather(*(self.plugin_remove(name) for name in self.plugins))

self.process = None
await self.rpc.close()
self.status = Status.closed
await super().close()
self.__exit_stack.__exit__(None, None, None)
logger.info("Nanny at %r closed.", self.address_safe)
self.stop()
if self.process is not None:
await self.kill(timeout=timeout, reason=reason)
finally:
# Whatever happens, e.g. a timeout while killing the worker process,
# the nanny must reach a terminal status; otherwise concurrent and
# future calls to close() would wait forever on self.finished().
self.process = None
self.status = Status.closed
await self.rpc.close()
await super().close()
self.__exit_stack.__exit__(None, None, None)
logger.info("Nanny at %r closed.", self.address_safe)
return "OK"

async def _log_event(self, topic, msg):
Expand Down Expand Up @@ -802,15 +806,14 @@ async def kill(
) -> None:
"""
Ensure the worker process is stopped, waiting at most
``timeout * 0.8`` seconds before killing it abruptly.
``timeout * 0.8`` seconds before killing it abruptly; after that, wait
up to ``timeout`` more seconds for the killed process to be joined.

When `kill` returns, the worker process has been joined.

If the worker process does not terminate within ``timeout`` seconds,
even after being killed, `asyncio.TimeoutError` is raised.
If the worker process does not terminate within ``timeout * 1.8``
seconds, even after being killed, `asyncio.TimeoutError` is raised.
"""
deadline = time() + timeout

# If the process is not properly up it will not watch the closing queue
# and we may end up leaking this process
# Therefore wait for it to be properly started before killing it
Expand Down Expand Up @@ -855,7 +858,12 @@ async def kill(
f"Worker process still alive after {wait_timeout:.1f} seconds, killing"
)
await process.kill()
await process.join(max(0, deadline - time()))
# A killed process can't linger except in exceptional circumstances,
# e.g. it's hanging in uninterruptible I/O. However, joining it may
# take a substantial amount of time when the host is heavily loaded,
# so wait with a fresh timeout rather than with whatever is left of
# the original one.
await process.join(timeout)
except ValueError as e:
if "invalid operation on closed AsyncProcess" in str(e):
return
Expand Down
3 changes: 3 additions & 0 deletions distributed/tests/test_stress.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,9 @@ async def test_no_delay_during_large_transfer(c, s, w):
client=True,
Worker=Nanny,
nthreads=[("", 2)] * 6,
# On heavily loaded CI hosts, starting the cluster can take longer than the
# whole default budget, while the test body by design runs for 10+ seconds
timeout=120,
scheduler_kwargs={"transition_counter_max": 500_000},
worker_kwargs={"transition_counter_max": 500_000},
)
Expand Down
Loading