NCCL
NCCL是英伟达开源的GPU通信库,支持集合通信和点对点通信。
看下官方给的一个demo:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
|
#include <stdio.h> #include "cuda_runtime.h" #include "nccl.h" #include "mpi.h" #include <unistd.h> #include <stdint.h> #define MPICHECK(cmd) do { \ int e = cmd; \ if ( e ! = MPI_SUCCESS ) { \ printf( "Failed: MPI error %s:%d '%d'\n" , \ __FILE__,__LINE__, e); \ exit(EXIT_FAILURE); \ } \ } while ( 0 ) #define CUDACHECK(cmd) do { \ cudaError_t e = cmd; \ if ( e ! = cudaSuccess ) { \ printf( "Failed: Cuda error %s:%d '%s'\n" , \ __FILE__,__LINE__,cudaGetErrorString(e)); \ exit(EXIT_FAILURE); \ } \ } while ( 0 ) #define NCCLCHECK(cmd) do { \ ncclResult_t r = cmd; \ if (r! = ncclSuccess) { \ printf( "Failed, NCCL error %s:%d '%s'\n" , \ __FILE__,__LINE__,ncclGetErrorString(r)); \ exit(EXIT_FAILURE); \ } \ } while ( 0 ) static uint64_t getHostHash(const char * string) { / / Based on DJB2a, result = result * 33 ^ char uint64_t result = 5381 ; for ( int c = 0 ; string[c] ! = '\0' ; c + + ){ result = ((result << 5 ) + result) ^ string[c]; } return result; } static void getHostName(char * hostname, int maxlen) { gethostname(hostname, maxlen); for ( int i = 0 ; i< maxlen; i + + ) { if (hostname[i] = = '.' ) { hostname[i] = '\0' ; return ; } } } int main( int argc, char * argv[]) { int size = 32 * 1024 * 1024 ; int myRank, nRanks, localRank = 0 ; / / initializing MPI MPICHECK(MPI_Init(&argc, &argv)); MPICHECK(MPI_Comm_rank(MPI_COMM_WORLD, &myRank)); MPICHECK(MPI_Comm_size(MPI_COMM_WORLD, &nRanks)); / / calculating localRank which is used in selecting a GPU uint64_t hostHashs[nRanks]; char hostname[ 1024 ]; getHostName(hostname, 1024 ); hostHashs[myRank] = getHostHash(hostname); MPICHECK(MPI_Allgather(MPI_IN_PLACE, 0 , MPI_DATATYPE_NULL, hostHashs, sizeof(uint64_t), MPI_BYTE, MPI_COMM_WORLD)); for ( int p = 0 ; p<nRanks; p + + ) { if (p = = myRank) break ; if (hostHashs[p] = = hostHashs[myRank]) localRank + + ; } / / each process is using two GPUs int nDev = 2 ; float * * sendbuff = ( float * * )malloc(nDev * sizeof( float * )); float * * recvbuff = ( float * * )malloc(nDev * sizeof( float * )); cudaStream_t * s = (cudaStream_t * )malloc(sizeof(cudaStream_t) * nDev); / / picking GPUs based on localRank for ( int i = 0 ; i < nDev; + + i) { CUDACHECK(cudaSetDevice(localRank * nDev + i)); CUDACHECK(cudaMalloc(sendbuff + i, size * sizeof( float ))); CUDACHECK(cudaMalloc(recvbuff + i, size * sizeof( float ))); CUDACHECK(cudaMemset(sendbuff[i], 1 , size * sizeof( float ))); CUDACHECK(cudaMemset(recvbuff[i], 0 , size * sizeof( float ))); CUDACHECK(cudaStreamCreate(s + i)); } ncclUniqueId id ; ncclComm_t comms[nDev]; / / generating NCCL unique ID at one process and broadcasting it to all if (myRank = = 0 ) ncclGetUniqueId(& id ); MPICHECK(MPI_Bcast((void * )& id , sizeof( id ), MPI_BYTE, 0 , MPI_COMM_WORLD)); / / initializing NCCL, group API is required around ncclCommInitRank as it is / / called across multiple GPUs in each thread / process NCCLCHECK(ncclGroupStart()); for ( int i = 0 ; i<nDev; i + + ) { CUDACHECK(cudaSetDevice(localRank * nDev + i)); NCCLCHECK(ncclCommInitRank(comms + i, nRanks * nDev, id , myRank * nDev + i)); } NCCLCHECK(ncclGroupEnd()); / / calling NCCL communication API. Group API is required when using / / multiple devices per thread / process NCCLCHECK(ncclGroupStart()); for ( int i = 0 ; i<nDev; i + + ) NCCLCHECK(ncclAllReduce((const void * )sendbuff[i], (void * )recvbuff[i], size, ncclFloat, ncclSum, comms[i], s[i])); NCCLCHECK(ncclGroupEnd()); / / synchronizing on CUDA stream to complete NCCL communication for ( int i = 0 ; i<nDev; i + + ) CUDACHECK(cudaStreamSynchronize(s[i])); / / freeing device memory for ( int i = 0 ; i<nDev; i + + ) { CUDACHECK(cudaFree(sendbuff[i])); CUDACHECK(cudaFree(recvbuff[i])); } / / finalizing NCCL for ( int i = 0 ; i<nDev; i + + ) { ncclCommDestroy(comms[i]); } / / finalizing MPI MPICHECK(MPI_Finalize()); printf( "[MPI Rank %d] Success \n" , myRank); return 0 ; } |
在上边的示例中,rank0会执行ncclGetUniqueId获取Id,然后通过mpi广播给其他rank,接下来看下UniqueId是怎么产生的。
1
2
3
4
5
|
ncclResult_t ncclGetUniqueId(ncclUniqueId * out) { NCCLCHECK(ncclInit()); NCCLCHECK(PtrCheck(out, "GetUniqueId" , "out" )); return bootstrapGetUniqueId(out); } |
然后看下ncclInit
首先执行initEnv,设置环境变量
然后执行initNet,用来初始化nccl所需要的网络,包括两个,一个是bootstrap网络,另外一个是数据通信网络,bootstrap网络主要用于初始化时交换一些简单的信息,比如每个机器的ip端口,由于数据量很小,而且主要是在初始化阶段执行一次,因此bootstrap使用的是tcp;而通信网络是用于实际数据的传输,因此会优先使用rdma(支持gdr的话会优先使用gdr)。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
ncclResult_t initNet() { / / Always initialize bootstrap network NCCLCHECK(bootstrapNetInit()); NCCLCHECK(initNetPlugin(&ncclNet, &ncclCollNet)); if (ncclNet ! = NULL) return ncclSuccess; if (initNet(&ncclNetIb) = = ncclSuccess) { ncclNet = &ncclNetIb; } else { NCCLCHECK(initNet(&ncclNetSocket)); ncclNet = &ncclNetSocket; } return ncclSuccess; } |
bootstrapNetInit就是bootstrap网络的初始化,主要就是通过findInterfaces遍历机器上所有的网卡信息,通过prefixList匹配选择使用哪些网卡,将可用网卡的信息保存下来,将ifa_name保存到全局的bootstrapNetIfNames,ip地址保存到全局bootstrapNetIfAddrs,默认除了docker和lo其他的网卡都可以使用。
例如在测试机器上有三张网卡,分别是xgbe0、xgbe1、xgbe2,那么就会把这三个ifaname和对应的ip地址保存下来,另外nccl提供了环境变量NCCL_SOCKET_IFNAME可以用来指定想用的网卡名,例如通过export NCCL_SOCKET_IFNAME=xgbe0来指定使用xgbe0,其实就是通过prefixList来匹配做到的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
static int findInterfaces(const char * prefixList, char * names, union socketAddress * addrs, int sock_family, int maxIfNameSize, int maxIfs) { struct netIf userIfs[MAX_IFS]; bool searchNot = prefixList && prefixList[ 0 ] = = '^' ; if (searchNot) prefixList + + ; bool searchExact = prefixList && prefixList[ 0 ] = = '=' ; if (searchExact) prefixList + + ; int nUserIfs = parseStringList(prefixList, userIfs, MAX_IFS); int found = 0 ; struct ifaddrs * interfaces, * interface; getifaddrs(&interfaces); for (interface = interfaces; interface && found < maxIfs; interface = interface - >ifa_next) { if (interface - >ifa_addr = = NULL) continue ; int family = interface - >ifa_addr - >sa_family; if (family ! = AF_INET && family ! = AF_INET6) continue ; if (sock_family ! = - 1 && family ! = sock_family) continue ; if (family = = AF_INET6) { struct sockaddr_in6 * sa = (struct sockaddr_in6 * )(interface - >ifa_addr); if (IN6_IS_ADDR_LOOPBACK(&sa - >sin6_addr)) continue ; } if (!(matchIfList(interface - >ifa_name, - 1 , userIfs, nUserIfs, searchExact) ^ searchNot)) { continue ; } bool duplicate = false; for ( int i = 0 ; i < found; i + + ) { if (strcmp(interface - >ifa_name, names + i * maxIfNameSize) = = 0 ) { duplicate = true; break ; } } if (!duplicate) { strncpy(names + found * maxIfNameSize, interface - >ifa_name, maxIfNameSize); int salen = (family = = AF_INET) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6); memcpy(addrs + found, interface - >ifa_addr, salen); found + + ; } } freeifaddrs(interfaces); return found; } |
开始初始化通信网络
ncclNet_t结构体是一系列的函数指针,比如初始化,发送,接收等;socket,IB等通信方式都实现了自己的ncclNet_t,如ncclNetSocket,ncclNetIb,初始化通信网络的过程就是依次看哪个通信模式可用,然后赋值给全局的ncclNet。
首先执行initNetPlugin,查看是否有libnccl-net.so,测试环境没有这个so,所以直接返回。
然后尝试使用IB网络:
首先执行ncclNetIb的init函数,就是ncclIbInit
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
|
ncclResult_t ncclIbInit(ncclDebugLogger_t logFunction) { static int shownIbHcaEnv = 0 ; if (wrap_ibv_symbols() ! = ncclSuccess) { return ncclInternalError; } if (ncclParamIbDisable()) return ncclInternalError; if (ncclNIbDevs = = - 1 ) { pthread_mutex_lock(&ncclIbLock); wrap_ibv_fork_init(); if (ncclNIbDevs = = - 1 ) { ncclNIbDevs = 0 ; if (findInterfaces(ncclIbIfName, &ncclIbIfAddr, MAX_IF_NAME_SIZE, 1 ) ! = 1 ) { WARN( "NET/IB : No IP interface found." ); return ncclInternalError; } / / Detect IB cards int nIbDevs; struct ibv_device * * devices; / / Check if user defined which IB device:port to use char * userIbEnv = getenv( "NCCL_IB_HCA" ); if (userIbEnv ! = NULL && shownIbHcaEnv + + = = 0 ) INFO(NCCL_NET|NCCL_ENV, "NCCL_IB_HCA set to %s" , userIbEnv); struct netIf userIfs[MAX_IB_DEVS]; bool searchNot = userIbEnv && userIbEnv[ 0 ] = = '^' ; if (searchNot) userIbEnv + + ; bool searchExact = userIbEnv && userIbEnv[ 0 ] = = '=' ; if (searchExact) userIbEnv + + ; int nUserIfs = parseStringList(userIbEnv, userIfs, MAX_IB_DEVS); if (ncclSuccess ! = wrap_ibv_get_device_list(&devices, &nIbDevs)) return ncclInternalError; for ( int d = 0 ; d<nIbDevs && ncclNIbDevs<MAX_IB_DEVS; d + + ) { struct ibv_context * context; if (ncclSuccess ! = wrap_ibv_open_device(&context, devices[d]) || context = = NULL) { WARN( "NET/IB : Unable to open device %s" , devices[d] - >name); continue ; } int nPorts = 0 ; struct ibv_device_attr devAttr; memset(&devAttr, 0 , sizeof(devAttr)); if (ncclSuccess ! = wrap_ibv_query_device(context, &devAttr)) { WARN( "NET/IB : Unable to query device %s" , devices[d] - >name); if (ncclSuccess ! = wrap_ibv_close_device(context)) { return ncclInternalError; } continue ; } for ( int port = 1 ; port < = devAttr.phys_port_cnt; port + + ) { struct ibv_port_attr portAttr; if (ncclSuccess ! = wrap_ibv_query_port(context, port, &portAttr)) { WARN( "NET/IB : Unable to query port %d" , port); continue ; } if (portAttr.state ! = IBV_PORT_ACTIVE) continue ; if (portAttr.link_layer ! = IBV_LINK_LAYER_INFINIBAND && portAttr.link_layer ! = IBV_LINK_LAYER_ETHERNET) continue ; / / check against user specified HCAs / ports if (! (matchIfList(devices[d] - >name, port, userIfs, nUserIfs, searchExact) ^ searchNot)) { continue ; } TRACE(NCCL_INIT|NCCL_NET, "NET/IB: [%d] %s:%d/%s " , d, devices[d] - >name, port, portAttr.link_layer = = IBV_LINK_LAYER_INFINIBAND ? "IB" : "RoCE" ); ncclIbDevs[ncclNIbDevs].device = d; ncclIbDevs[ncclNIbDevs].guid = devAttr.sys_image_guid; ncclIbDevs[ncclNIbDevs].port = port; ncclIbDevs[ncclNIbDevs].link = portAttr.link_layer; ncclIbDevs[ncclNIbDevs].speed = ncclIbSpeed(portAttr.active_speed) * ncclIbWidth(portAttr.active_width); ncclIbDevs[ncclNIbDevs].context = context; strncpy(ncclIbDevs[ncclNIbDevs].devName, devices[d] - >name, MAXNAMESIZE); NCCLCHECK(ncclIbGetPciPath(ncclIbDevs[ncclNIbDevs].devName, &ncclIbDevs[ncclNIbDevs].pciPath, &ncclIbDevs[ncclNIbDevs].realPort)); ncclIbDevs[ncclNIbDevs].maxQp = devAttr.max_qp; ncclNIbDevs + + ; nPorts + + ; pthread_create(&ncclIbAsyncThread, NULL, ncclIbAsyncThreadMain, context); } if (nPorts = = 0 && ncclSuccess ! = wrap_ibv_close_device(context)) { return ncclInternalError; } } if (nIbDevs && (ncclSuccess ! = wrap_ibv_free_device_list(devices))) { return ncclInternalError; }; } if (ncclNIbDevs = = 0 ) { INFO(NCCL_INIT|NCCL_NET, "NET/IB : No device found." ); } else { char line[ 1024 ]; line[ 0 ] = '\0' ; for ( int d = 0 ; d<ncclNIbDevs; d + + ) { snprintf(line + strlen(line), 1023 - strlen(line), " [%d]%s:%d/%s" , d, ncclIbDevs[d].devName, ncclIbDevs[d].port, ncclIbDevs[d].link = = IBV_LINK_LAYER_INFINIBAND ? "IB" : "RoCE" ); } line[ 1023 ] = '\0' ; char addrline[ 1024 ]; INFO(NCCL_INIT|NCCL_NET, "NET/IB : Using%s ; OOB %s:%s" , line, ncclIbIfName, socketToString(&ncclIbIfAddr.sa, addrline)); } pthread_mutex_unlock(&ncclIbLock); } return ncclSuccess; } |
首先第三行通过wrap_ibv_symbols加载动态库libibverbs.so,然后获取动态库的各个函数。
然后通过wrap_ibv_fork_init避免fork引起rdma网卡读写出错。
后面会讲到ib网络也会用到socket进行带外网络的传输,所以这里也通过findInterfaces获取一个可用的网卡保存到ncclIbIfAddr。
通过ibv_get_device_list获取所有rdma设备到devices中,遍历devices的每个device,因为每个HCA可能有多个物理port,所以对每个device遍历每一个物理port,获取每个port的信息。
然后将相关信息保存到全局的ncclIbDevs中,比如是哪个device的哪个port,使用的是IB还是ROCE,device的pci路径,maxqp,device的name等,注意这里也有类似bootstrap网络NCCL_SOCKET_IFNAME的环境变量,叫NCCL_IB_HCA,可以指定使用哪个IB HCA。
到这里整个初始化的过程就完成了,一句话总结就是,获取了当前机器上所有可用的IB网卡和普通以太网卡之后保存下来。
然后开始生成UniqueId
1
2
3
4
5
6
7
8
|
ncclResult_t bootstrapCreateRoot(ncclUniqueId * id , bool idFromEnv) { ncclNetHandle_t * netHandle = (ncclNetHandle_t * ) id ; void * listenComm; NCCLCHECK(bootstrapNetListen(idFromEnv ? dontCareIf : 0 , netHandle, &listenComm)); pthread_t thread; pthread_create(&thread, NULL, bootstrapRoot, listenComm); return ncclSuccess; } |
ncclNetHandle_t也是一个字符数组,然后执行bootstrapNetListen。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
static ncclResult_t bootstrapNetListen( int dev, ncclNetHandle_t * netHandle, void * * listenComm) { union socketAddress * connectAddr = (union socketAddress * ) netHandle; static_assert(sizeof(union socketAddress) < NCCL_NET_HANDLE_MAXSIZE, "union socketAddress size is too large" ); / / if dev > = 0 , listen based on dev if (dev > = 0 ) { NCCLCHECK(bootstrapNetGetSocketAddr(dev, connectAddr)); } else if (dev = = findSubnetIf) { ... } / / Otherwise, handle stores a local address struct bootstrapNetComm * comm; NCCLCHECK(bootstrapNetNewComm(&comm)); NCCLCHECK(createListenSocket(&comm - >fd, connectAddr)); * listenComm = comm; return ncclSuccess; } |
依次看下这三个函数,通过bootstrapNetGetSocketAddr获取一个可用的ip地址。
1
2
3
4
5
|
static ncclResult_t bootstrapNetGetSocketAddr( int dev, union socketAddress * addr) { if (dev > = bootstrapNetIfs) return ncclInternalError; memcpy(addr, bootstrapNetIfAddrs + dev, sizeof( * addr)); return ncclSuccess; } |
此时dev是0, bootstrapNetIfs是初始化bootstrap网络的时候一共找到了几个可用的网卡,这里就是获取了第0个可用的ip地址。
然后通过bootstrapNetNewComm创建bootstrapNetComm,bootstrapNetComm其实就是fd,bootstrapNetNewComm其实就是new了一个bootstrapNetComm。
1
2
3
|
struct bootstrapNetComm { int fd; }; |
通过createListenSocket启动socker server
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
static ncclResult_t createListenSocket( int * fd, union socketAddress * localAddr) { / * IPv4 / IPv6 support * / int family = localAddr - >sa.sa_family; int salen = (family = = AF_INET) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6); / * Create socket and bind it to a port * / int sockfd = socket(family, SOCK_STREAM, 0 ); if (sockfd = = - 1 ) { WARN( "Net : Socket creation failed : %s" , strerror(errno)); return ncclSystemError; } if (socketToPort(&localAddr - >sa)) { / / Port is forced by env. Make sure we get the port. int opt = 1 ; #if defined(SO_REUSEPORT) SYSCHECK(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)), "setsockopt" ); #else SYSCHECK(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)), "setsockopt" ); #endif } / / localAddr port should be 0 ( Any port) SYSCHECK(bind(sockfd, &localAddr - >sa, salen), "bind" ); / * Get the assigned Port * / socklen_t size = salen; SYSCHECK(getsockname(sockfd, &localAddr - >sa, &size), "getsockname" ); #ifdef ENABLE_TRACE char line[ 1024 ]; TRACE(NCCL_INIT|NCCL_NET, "Listening on socket %s" , socketToString(&localAddr - >sa, line)); #endif / * Put the socket in listen mode * NB: The backlog will be silently truncated to the value in / proc / sys / net / core / somaxconn * / SYSCHECK(listen(sockfd, 16384 ), "listen" ); * fd = sockfd; return ncclSuccess; } |
创建监听fd,ip由localaddr指定,初始端口为0,bind时随机找一个可用端口,并通过getsockname(sockfd, &localAddr->sa, &size)将ip端口写回到localaddr,这里localaddr就是UniqueId。
到这里UniqueId也就产生了,其实就是当前机器的ip和port。
欢迎 Star、试用 OneFlow 最新版本:github.com/Oneflow-Inc…
以上就是NCCL源码解析之初始化及ncclUniqueId的产生详解的详细内容,更多关于NCCL初始化ncclUniqueId产生的资料请关注服务器之家其它相关文章!
原文链接:https://blog.csdn.net/KIDGIN7439/article/details/126712106?