From 7f3b088744b184d595c0daeb2d721c2c8908f4bc Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Fri, 31 Jan 2025 17:52:15 -0800 Subject: [PATCH] Add multi-nodes example & update doc (#455) Documentation update: * [`docs/design/mscclpp-dsl.md`](diffhunk://#diff-02a69290fb3e02b8a069bf915fbf5266cfc2ac51c6e9ff8b5b19df51ed909b22L114-R114): Updated the link to the examples folder to reflect the correct path. New example script: * [`python/examples/allgather_allpairs_multinodes_packets.py`](diffhunk://#diff-ab42c16ecca0680d55b60b82a6913138c5fba4069b9c4493fbe8c72217fe54bcR1-R76): Added a new example script demonstrating the allgather all-pairs algorithm across multiple nodes using packet communication. IR module improvements: * [`python/mscclpp/language/ir.py`](diffhunk://#diff-b025796b03fbbd9b2ca9aee2569547efa7a56101743bc4aa05661be0b52aeec9L470-R472): Refined the sorting criteria for GPU instance channels and thread block channels to include the channel type, ensuring a more accurate order. Debugging enhancements: * [`src/executor/executor.cc`](diffhunk://#diff-60f7806d111e5cc12ded06358b5d5b09b8521e3858f182d8be81ac05147c535dR439-R441): Added a debug log to indicate the start of communication collective execution with details about the execution plan and collective. * [`src/include/debug.h`](diffhunk://#diff-24e5fda55e3712277be4bb99b3c348294a77ebd3046bfe716b74bdb32cd203dfR89): Introduced a new debug log subsystem identifier `MSCCLPP_EXECUTOR` for logging executor-related information. --- .github/workflows/mscclpp-lang.yml | 4 + docker/build.sh | 1 - docs/design/mscclpp-dsl.md | 2 +- .../allgather_allpairs_multinodes_packets.py | 74 +++++++++++++++++++ python/mscclpp/language/ir.py | 6 +- .../configs/mscclpp_lang_test_config.json | 4 + src/executor/executor.cc | 3 + src/include/debug.h | 1 + 8 files changed, 91 insertions(+), 4 deletions(-) create mode 100644 python/examples/allgather_allpairs_multinodes_packets.py diff --git a/.github/workflows/mscclpp-lang.yml b/.github/workflows/mscclpp-lang.yml index a83b8475f..287b53691 100644 --- a/.github/workflows/mscclpp-lang.yml +++ b/.github/workflows/mscclpp-lang.yml @@ -19,6 +19,10 @@ jobs: steps: - uses: actions/checkout@v4 + + - name: Set environment variable + run: echo "LD_LIBRARY_PATH=/usr/local/cuda/compat:/usr/local/cuda/lib64" >> $GITHUB_ENV + - name: Install mscclpp run: | CMAKE_ARGS="-DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_CUDA=ON" pip3 install . diff --git a/docker/build.sh b/docker/build.sh index af4a23025..ff9fd581f 100755 --- a/docker/build.sh +++ b/docker/build.sh @@ -14,7 +14,6 @@ baseImageTable=( declare -A extraLdPathTable extraLdPathTable=( - ["cuda11.8"]="/usr/local/cuda-11.8/lib64" ["cuda12.1"]="/usr/local/cuda-12.1/compat:/usr/local/cuda-12.1/lib64" ["cuda12.2"]="/usr/local/cuda-12.2/compat:/usr/local/cuda-12.2/lib64" ["cuda12.3"]="/usr/local/cuda-12.3/compat:/usr/local/cuda-12.3/lib64" diff --git a/docs/design/mscclpp-dsl.md b/docs/design/mscclpp-dsl.md index 9b6955e81..cfb8072fc 100644 --- a/docs/design/mscclpp-dsl.md +++ b/docs/design/mscclpp-dsl.md @@ -111,4 +111,4 @@ Packet APIs are used when user wants to use LL algorithm. The packet APIs are si ### Examples -We provide several examples demonstrating how to use the MSCCL++ DSL to write communication collective algorithms. For more details, please refer to the [examples](https://github.com/microsoft/mscclpp/tree/main/mscclpp-lang/python/examples) folder. \ No newline at end of file +We provide several examples demonstrating how to use the MSCCL++ DSL to write communication collective algorithms. For more details, please refer to the [examples](https://github.com/microsoft/mscclpp/tree/main/python/examples) folder. \ No newline at end of file diff --git a/python/examples/allgather_allpairs_multinodes_packets.py b/python/examples/allgather_allpairs_multinodes_packets.py new file mode 100644 index 000000000..a08b54dd4 --- /dev/null +++ b/python/examples/allgather_allpairs_multinodes_packets.py @@ -0,0 +1,74 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import argparse +from mscclpp.language import * +from mscclpp.language.collectives import AllGather +from mscclpp.language.buffer import Buffer +from mscclpp.language.types import ChannelType, ReplicationPolicy + + +def allgather_multinodes_allpair(gpus, gpus_per_node, instances): + """ + Implements a multi-node allgather collective using an allpairs algorithm with MSCCL++ DSL. + @param gpus: Total number of GPUs + @param gpus_per_node: Number of GPUs per node + Steps: + 1. Each rank sends a chunk to all other ranks' scratch buffers using packet format. + 2. Copy the chunk from the scratch buffer to the output buffer using packet format. + """ + collective = AllGather(gpus, 1, True) + with MSCCLPPProgram( + "allgather_multinodes_allpair", + collective, + gpus, + instances, + protocol="LL", + replication_policy=ReplicationPolicy.interleaved, + num_threads_per_block=1024, + ): + for g in range(gpus): + src_rank = g + c = chunk(src_rank, Buffer.input, 0, 1) + for peer in range(1, gpus): + dst_rank = (src_rank + peer) % gpus + tb = dst_rank if dst_rank < src_rank else dst_rank - 1 + if src_rank // gpus_per_node == dst_rank // gpus_per_node: + c.put_packet(dst_rank, Buffer.scratch, index=src_rank, sendtb=tb) + else: + c.put_packet( + dst_rank, + Buffer.scratch, + index=src_rank, + sendtb=tb, + chan_type=ChannelType.port, + temp_buffer=Buffer.scratch, + temp_buffer_index=src_rank, + ) + + # Copying packet from local scratch buffer to local buffer + for g in range(gpus): + src_rank = g + src_offset = src_rank + for peer in range(1, gpus): + dst_rank = (g + peer) % gpus + tb = src_offset if src_offset < dst_rank else src_offset - 1 + c = chunk(dst_rank, Buffer.scratch, src_offset, 1) + c.copy_packet(dst_rank, Buffer.output, src_offset, sendtb=tb + gpus - 1) + + Json() + Check() + + +parser = argparse.ArgumentParser() +parser.add_argument("num_gpus", type=int, help="number of gpus") +parser.add_argument("gpus_per_node", type=int, help="number of gpus") +parser.add_argument("instances", type=int, help="number of instances") + +args = parser.parse_args() + +allgather_multinodes_allpair( + args.num_gpus, + args.gpus_per_node, + args.instances, +) diff --git a/python/mscclpp/language/ir.py b/python/mscclpp/language/ir.py index 4cb12e6da..16a5b67fa 100644 --- a/python/mscclpp/language/ir.py +++ b/python/mscclpp/language/ir.py @@ -467,7 +467,9 @@ def remove_empty_fields(d): obj["connectedTo"] = [sorted(list(peers)) for peers in obj["connectedTo"]] gpu_instance["channels"].append(obj) gpu_instance["channels"] = list(filter(lambda x: x["type"] != "none", gpu_instance["channels"])) - gpu_instance["channels"] = sorted(gpu_instance["channels"], key=lambda x: (x["srcbuff"], x["dstbuff"])) + gpu_instance["channels"] = sorted( + gpu_instance["channels"], key=lambda x: (x["srcbuff"], x["dstbuff"], x["type"]) + ) # render for GPU NVLS channels for i, chan in enumerate(gpu_instance["channels"]): @@ -502,7 +504,7 @@ def remove_empty_fields(d): tb_channel_dict[(srcBuffer, dstBuffer, type)] = obj tb_channels.append(obj) tb_channels = filter(lambda x: x["type"] != "none", tb_channels) - tb_channels = sorted(tb_channels, key=lambda x: (x["srcbuff"], x["dstbuff"])) + tb_channels = sorted(tb_channels, key=lambda x: (x["srcbuff"], x["dstbuff"], x["type"])) for op in tb.ops: if op.tb == -1: continue diff --git a/python/test/configs/mscclpp_lang_test_config.json b/python/test/configs/mscclpp_lang_test_config.json index f4cbaf930..e8a1d3e1e 100644 --- a/python/test/configs/mscclpp_lang_test_config.json +++ b/python/test/configs/mscclpp_lang_test_config.json @@ -30,5 +30,9 @@ { "filename": "allreduce_nvls.py", "args": ["8", "2"] + }, + { + "filename": "allgather_allpairs_multinodes_packets.py", + "args": ["16", "8", "1"] } ] diff --git a/src/executor/executor.cc b/src/executor/executor.cc index 25e55bb56..da3ab973b 100644 --- a/src/executor/executor.cc +++ b/src/executor/executor.cc @@ -7,6 +7,7 @@ #include #include +#include "debug.h" #include "execution_kernel.hpp" #include "execution_plan.hpp" @@ -435,6 +436,8 @@ Executor::Executor(std::shared_ptr comm) : impl_(std::make_unique< void Executor::execute(int rank, void* sendbuff, void* recvbuff, size_t sendBuffSize, [[maybe_unused]] size_t recvBuffSize, DataType dataType, const ExecutionPlan& plan, cudaStream_t stream, PacketType packetType) { + INFO(MSCCLPP_EXECUTOR, "Starting execution with plan: %s, collective: %s", plan.name().c_str(), + plan.collective().c_str()); size_t sendMemRange, recvMemRange; CUdeviceptr sendBasePtr, recvBasePtr; MSCCLPP_CUTHROW(cuMemGetAddressRange(&sendBasePtr, &sendMemRange, (CUdeviceptr)sendbuff)); diff --git a/src/include/debug.h b/src/include/debug.h index 713371b62..893b96d0f 100644 --- a/src/include/debug.h +++ b/src/include/debug.h @@ -86,6 +86,7 @@ typedef enum { MSCCLPP_ENV = 128, MSCCLPP_ALLOC = 256, MSCCLPP_CALL = 512, + MSCCLPP_EXECUTOR = 1024, MSCCLPP_ALL = ~0 } mscclppDebugLogSubSys;