|
| 1 | +/** |
| 2 | + * Copyright (c) Facebook, Inc. and its affiliates. |
| 3 | + */ |
| 4 | + |
| 5 | +#include <array> |
| 6 | +#include <iostream> |
| 7 | +#include <memory> |
| 8 | +#include <typeinfo> |
| 9 | + |
| 10 | +#include "gloo/allreduce.h" |
| 11 | +#include "gloo/allreduce_ring.h" |
| 12 | +#include "gloo/rendezvous/context.h" |
| 13 | +#include "gloo/rendezvous/file_store.h" |
| 14 | +#include "gloo/rendezvous/prefix_store.h" |
| 15 | +#include "gloo/transport/uv/device.h" |
| 16 | + |
| 17 | +// Usage: |
| 18 | +// |
| 19 | +// Open two terminals. Run the same program in both terminals, using |
| 20 | +// a different RANK in each. For example: |
| 21 | +// |
| 22 | +// A: PREFIX=test1 SIZE=2 RANK=0 example_allreduce |
| 23 | +// B: PREFIX=test1 SIZE=2 RANK=1 example_allreduce |
| 24 | +// |
| 25 | +// Expected output: |
| 26 | +// |
| 27 | +// data[0] = 18 |
| 28 | +// data[1] = 18 |
| 29 | +// data[2] = 18 |
| 30 | +// data[3] = 18 |
| 31 | +// |
| 32 | + |
| 33 | +void mysum(void* c_, const void* a_, const void* b_, int n) { |
| 34 | + printf("n=%d\r\n", n); |
| 35 | + int* c = static_cast<int*>(c_); |
| 36 | + const int* a = static_cast<const int*>(a_); |
| 37 | + const int* b = static_cast<const int*>(b_); |
| 38 | + for (auto i = 0; i < n; i++) { |
| 39 | + printf("a[%d]=%d\r\n", i, a[i]); |
| 40 | + printf("b[%d]=%d\r\n", i, b[i]); |
| 41 | + c[i] = a[i] + b[i]; |
| 42 | + printf("c[%d]=%d\r\n", i, c[i]); |
| 43 | + } |
| 44 | +} |
| 45 | + |
| 46 | +int main(void) { |
| 47 | + if (getenv("PREFIX") == nullptr || getenv("SIZE") == nullptr || |
| 48 | + getenv("RANK") == nullptr) { |
| 49 | + std::cerr << "Please set environment variables PREFIX, SIZE, and RANK." |
| 50 | + << std::endl; |
| 51 | + return 1; |
| 52 | + } |
| 53 | + |
| 54 | + // The following statement creates a TCP "device" for Gloo to use. |
| 55 | + // See "gloo/transport/device.h" for more information. For the |
| 56 | + // purposes of this example, it is sufficient to see the device as |
| 57 | + // a factory for every communication pair. |
| 58 | + // |
| 59 | + // The argument to gloo::transport::tcp::CreateDevice is used to |
| 60 | + // find the network interface to bind connection to. The attr struct |
| 61 | + // can be populated to specify exactly which interface should be |
| 62 | + // used, as shown below. This is useful if you have identical |
| 63 | + // multi-homed machines that all share the same network interface |
| 64 | + // name, for example. |
| 65 | + // |
| 66 | + gloo::transport::uv::attr attr; |
| 67 | + // attr.iface = "eth0"; |
| 68 | + // attr.iface = "ib0"; |
| 69 | + // attr.iface = "Wi-Fi"; |
| 70 | + |
| 71 | + // attr.ai_family = AF_INET; // Force IPv4 |
| 72 | + // attr.ai_family = AF_INET6; // Force IPv6 |
| 73 | + attr.ai_family = AF_UNSPEC; // Use either (default) |
| 74 | + |
| 75 | + // A string is implicitly converted to an "attr" struct with its |
| 76 | + // hostname field populated. This will try to resolve the interface |
| 77 | + // to use by resolving the hostname or IP address, and finding the |
| 78 | + // corresponding network interface. |
| 79 | + // |
| 80 | + // Hostname "localhost" should resolve to 127.0.0.1, so using this |
| 81 | + // implies that all connections will be local. This can be useful |
| 82 | + // for single machine operation. |
| 83 | + // |
| 84 | + // auto dev = gloo::transport::tcp::CreateDevice("localhost"); |
| 85 | + // |
| 86 | + |
| 87 | + auto dev = gloo::transport::uv::CreateDevice(attr); |
| 88 | + |
| 89 | + // Now that we have a device, we can connect all participating |
| 90 | + // processes. We call this process "rendezvous". It can be performed |
| 91 | + // using a shared filesystem, a Redis instance, or something else by |
| 92 | + // extending it yourself. |
| 93 | + // |
| 94 | + // See "gloo/rendezvous/store.h" for the functionality you need to |
| 95 | + // implement to create your own store for performing rendezvous. |
| 96 | + // |
| 97 | + // Below, we instantiate rendezvous using the filesystem, given that |
| 98 | + // this example uses multiple processes on a single machine. |
| 99 | + // |
| 100 | + auto fileStore = gloo::rendezvous::FileStore("/libtmp"); |
| 101 | + |
| 102 | + // To be able to reuse the same store over and over again and not have |
| 103 | + // interference between runs, we scope it to a unique prefix with the |
| 104 | + // PrefixStore. This wraps another store and prefixes every key before |
| 105 | + // forwarding the call to the underlying store. |
| 106 | + std::string prefix = getenv("PREFIX"); |
| 107 | + auto prefixStore = gloo::rendezvous::PrefixStore(prefix, fileStore); |
| 108 | + |
| 109 | + // Using this store, we can now create a Gloo context. The context |
| 110 | + // holds a reference to every communication pair involving this |
| 111 | + // process. It is used by every collective algorithm to find the |
| 112 | + // current process's rank in the collective, the collective size, |
| 113 | + // and setup of send/receive buffer pairs. |
| 114 | + const int rank = atoi(getenv("RANK")); |
| 115 | + const int size = atoi(getenv("SIZE")); |
| 116 | + auto context = std::make_shared<gloo::rendezvous::Context>(rank, size); |
| 117 | + context->connectFullMesh(prefixStore, dev); |
| 118 | + |
| 119 | + // All connections are now established. We can now initialize some |
| 120 | + // test data, instantiate the collective algorithm, and run it. |
| 121 | + size_t elements = 4; |
| 122 | + std::vector<int*> inputPointers; |
| 123 | + std::vector<int*> outputPointers; |
| 124 | + for (size_t i = 0; i < elements; i++) { |
| 125 | + int* value = reinterpret_cast<int*>(malloc(sizeof(int))); |
| 126 | + *value = i * (rank + 1); |
| 127 | + inputPointers.push_back(value); |
| 128 | + int* value1 = reinterpret_cast<int*>(malloc(sizeof(int))); |
| 129 | + *value1 = 0; |
| 130 | + outputPointers.push_back(value1); |
| 131 | + } |
| 132 | + |
| 133 | + // Configure AllreduceOptions struct |
| 134 | + gloo::AllreduceOptions opts_(context); |
| 135 | + opts_.setInputs(inputPointers, 1); |
| 136 | + opts_.setOutputs(outputPointers, 1); |
| 137 | + opts_.setAlgorithm(gloo::AllreduceOptions::Algorithm::RING); |
| 138 | + void (*fn)(void*, const void*, const void*, int) = &mysum; |
| 139 | + opts_.setReduceFunction(fn); |
| 140 | + gloo::allreduce(opts_); |
| 141 | + |
| 142 | + // Print the result. |
| 143 | + std::cout << "Output: " << std::endl; |
| 144 | + for (int i = 0; i < outputPointers.size(); i++) { |
| 145 | + std::cout << "data[" << i << "] = " << *outputPointers[i] << std::endl; |
| 146 | + } |
| 147 | + |
| 148 | + return 0; |
| 149 | +} |
0 commit comments