Combines a CUDA event with a CPU threading event to enforce record->wait ordering across two threads.
This class is designed for exactly two threads: one producer that calls record() and one consumer that calls wait(). Using it with more than two threads is not supported and will produce undefined behavior.
CUDA events alone are insufficient for cross-thread synchronization because waiting on an unrecorded CUDA event is a no-op. The wait will return immediately instead of blocking. This class adds a threading.Event so that the waiting thread blocks on the CPU side until record() is called, at which point the CUDA event is guaranteed to be in-flight and event.wait() will correctly synchronize the GPU stream.
Source code in vllm/distributed/eplb/eplb_utils.py
| class CpuGpuEvent:
"""
Combines a CUDA event with a CPU threading event to enforce record->wait
ordering across two threads.
This class is designed for exactly two threads: one producer that calls
record() and one consumer that calls wait(). Using it with more than two
threads is not supported and will produce undefined behavior.
CUDA events alone are insufficient for cross-thread synchronization because
waiting on an unrecorded CUDA event is a no-op. The wait will return
immediately instead of blocking. This class adds a threading.Event so
that the waiting thread blocks on the CPU side until record() is called, at
which point the CUDA event is guaranteed to be in-flight and event.wait() will
correctly synchronize the GPU stream.
"""
def __init__(self):
self._event = torch.cuda.Event()
self._recorded = threading.Event()
def wait(self, stream: torch.cuda.Stream | None = None):
"""
Blocks the calling thread until record finishes. Used to guarantee that the
record kernel is called before wait.
Should only be called by the Async Eplb thread.
"""
self._recorded.wait()
self._event.wait(stream)
self._recorded.clear()
def record(self, stream: torch.cuda.Stream | None = None):
"""
Unblocks the waiting thread after calling event.record().
Should only be called by the main thread.
"""
if self._recorded.is_set():
raise RuntimeError(
"CpuGpuEvent.record() called before the previous event was "
"consumed by wait()"
)
self._event = torch.cuda.Event()
self._event.record(stream)
self._recorded.set()
|
record
record(stream: Stream | None = None)
Unblocks the waiting thread after calling event.record().
Should only be called by the main thread.
Source code in vllm/distributed/eplb/eplb_utils.py
| def record(self, stream: torch.cuda.Stream | None = None):
"""
Unblocks the waiting thread after calling event.record().
Should only be called by the main thread.
"""
if self._recorded.is_set():
raise RuntimeError(
"CpuGpuEvent.record() called before the previous event was "
"consumed by wait()"
)
self._event = torch.cuda.Event()
self._event.record(stream)
self._recorded.set()
|
wait
wait(stream: Stream | None = None)
Blocks the calling thread until record finishes. Used to guarantee that the record kernel is called before wait.
Should only be called by the Async Eplb thread.
Source code in vllm/distributed/eplb/eplb_utils.py
| def wait(self, stream: torch.cuda.Stream | None = None):
"""
Blocks the calling thread until record finishes. Used to guarantee that the
record kernel is called before wait.
Should only be called by the Async Eplb thread.
"""
self._recorded.wait()
self._event.wait(stream)
self._recorded.clear()
|