Newer
Older
# Copyright (c) OpenMMLab. All rights reserved.
import os
import os.path as osp
import tempfile
Zaida Zhou
committed
import unittest
from itertools import product
Zaida Zhou
committed
from unittest import TestCase
from unittest.mock import patch
import torch
import torch.distributed as torch_dist
import mmengine.dist as dist
from mmengine.dist.dist import sync_random_seed
Zaida Zhou
committed
from mmengine.testing._internal import MultiProcessTestCase
from mmengine.utils import TORCH_VERSION, digit_version
Zaida Zhou
committed
class TestDist(TestCase):
"""Test dist module in non-distributed environment."""
Zaida Zhou
committed
def test_all_reduce(self):
data = torch.arange(2, dtype=torch.int64)
expected = torch.arange(2, dtype=torch.int64)
dist.all_reduce(data)
self.assertTrue(torch.allclose(data, expected))
Zaida Zhou
committed
def test_all_gather(self):
data = torch.arange(2, dtype=torch.int64)
expected = torch.arange(2, dtype=torch.int64)
output = dist.all_gather(data)
self.assertTrue(torch.allclose(output[0], expected))
Zaida Zhou
committed
def test_gather(self):
data = torch.arange(2, dtype=torch.int64)
expected = torch.arange(2, dtype=torch.int64)
output = dist.gather(data)
self.assertTrue(torch.allclose(output[0], expected))
Zaida Zhou
committed
def test_broadcast(self):
data = torch.arange(2, dtype=torch.int64)
expected = torch.arange(2, dtype=torch.int64)
dist.broadcast(data)
self.assertTrue(torch.allclose(data, expected))
Zaida Zhou
committed
@patch('numpy.random.randint', return_value=10)
def test_sync_random_seed(self, mock):
self.assertEqual(sync_random_seed(), 10)
Zaida Zhou
committed
def test_broadcast_object_list(self):
with self.assertRaises(AssertionError):
# input should be list of object
dist.broadcast_object_list('foo')
Zaida Zhou
committed
data = ['foo', 12, {1: 2}]
expected = ['foo', 12, {1: 2}]
dist.broadcast_object_list(data)
self.assertEqual(data, expected)
def test_all_reduce_dict(self):
with self.assertRaises(AssertionError):
# input should be dict
dist.all_reduce_dict('foo')
data = {
'key1': torch.arange(2, dtype=torch.int64),
'key2': torch.arange(3, dtype=torch.int64)
}
expected = {
'key1': torch.arange(2, dtype=torch.int64),
'key2': torch.arange(3, dtype=torch.int64)
}
dist.all_reduce_dict(data)
for key in data:
self.assertTrue(torch.allclose(data[key], expected[key]))
Zaida Zhou
committed
def test_all_gather_object(self):
data = 'foo'
expected = 'foo'
gather_objects = dist.all_gather_object(data)
self.assertEqual(gather_objects[0], expected)
Zaida Zhou
committed
def test_gather_object(self):
data = 'foo'
expected = 'foo'
gather_objects = dist.gather_object(data)
self.assertEqual(gather_objects[0], expected)
Zaida Zhou
committed
def test_collect_results(self):
data = ['foo', {1: 2}]
size = 2
expected = ['foo', {1: 2}]
# test `device=cpu`
output = dist.collect_results(data, size, device='cpu')
self.assertEqual(output, expected)
# test `device=gpu`
output = dist.collect_results(data, size, device='gpu')
self.assertEqual(output, expected)
def test_all_reduce_params(self):
for tensor_type, reduce_op in zip([torch.int64, torch.float32],
['sum', 'mean']):
data = [
torch.tensor([0, 1], dtype=tensor_type) for _ in range(100)
]
data_gen = (item for item in data)
expected = [
torch.tensor([0, 1], dtype=tensor_type) for _ in range(100)
]
dist.all_reduce_params(data_gen, op=reduce_op)
for item1, item2 in zip(data, expected):
self.assertTrue(torch.allclose(item1, item2))
Zaida Zhou
committed
class TestDistWithGLOOBackend(MultiProcessTestCase):
def _init_dist_env(self, rank, world_size):
"""Initialize the distributed environment."""
os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_PORT'] = '29505'
os.environ['RANK'] = str(rank)
torch_dist.init_process_group(
backend='gloo', rank=rank, world_size=world_size)
def setUp(self):
super().setUp()
self._spawn_processes()
def test_all_reduce(self):
self._init_dist_env(self.rank, self.world_size)
tensor_types = [torch.int64, torch.float32, torch.int64]
reduce_ops = ['sum', 'mean', 'mean']
for tensor_type, reduce_op in zip(tensor_types, reduce_ops):
Zaida Zhou
committed
if dist.get_rank() == 0:
data = torch.tensor([1, 2], dtype=tensor_type)
else:
data = torch.tensor([3, 4], dtype=tensor_type)
if reduce_op == 'sum':
expected = torch.tensor([4, 6], dtype=tensor_type)
else:
expected = torch.tensor([2, 3], dtype=tensor_type)
dist.all_reduce(data, reduce_op)
self.assertTrue(torch.allclose(data, expected))
def test_all_gather(self):
self._init_dist_env(self.rank, self.world_size)
if dist.get_rank() == 0:
data = torch.tensor([0, 1])
else:
data = torch.tensor([1, 2])
Zaida Zhou
committed
expected = [torch.tensor([0, 1]), torch.tensor([1, 2])]
Zaida Zhou
committed
output = dist.all_gather(data)
self.assertTrue(
torch.allclose(output[dist.get_rank()], expected[dist.get_rank()]))
Zaida Zhou
committed
def test_gather(self):
self._init_dist_env(self.rank, self.world_size)
if dist.get_rank() == 0:
data = torch.tensor([0, 1])
else:
data = torch.tensor([1, 2])
Zaida Zhou
committed
output = dist.gather(data)
Zaida Zhou
committed
expected = [torch.tensor([0, 1]), torch.tensor([1, 2])]
for i in range(2):
assert torch.allclose(output[i], expected[i])
Zaida Zhou
committed
assert output == []
Zaida Zhou
committed
def test_broadcast_dist(self):
self._init_dist_env(self.rank, self.world_size)
if dist.get_rank() == 0:
data = torch.tensor([0, 1])
Zaida Zhou
committed
data = torch.tensor([1, 2])
Zaida Zhou
committed
expected = torch.tensor([0, 1])
dist.broadcast(data, 0)
assert torch.allclose(data, expected)
Zaida Zhou
committed
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
def test_sync_random_seed(self):
self._init_dist_env(self.rank, self.world_size)
with patch.object(
torch, 'tensor',
return_value=torch.tensor(1024)) as mock_tensor:
output = dist.sync_random_seed()
assert output == 1024
mock_tensor.assert_called()
def test_broadcast_object_list(self):
self._init_dist_env(self.rank, self.world_size)
if dist.get_rank() == 0:
data = ['foo', 12, {1: 2}]
else:
data = [None, None, None]
expected = ['foo', 12, {1: 2}]
dist.broadcast_object_list(data)
self.assertEqual(data, expected)
def test_all_reduce_dict(self):
self._init_dist_env(self.rank, self.world_size)
for tensor_type, reduce_op in zip([torch.int64, torch.float32],
['sum', 'mean']):
if dist.get_rank() == 0:
data = {
'key1': torch.tensor([0, 1], dtype=tensor_type),
'key2': torch.tensor([1, 2], dtype=tensor_type),
}
else:
data = {
'key1': torch.tensor([2, 3], dtype=tensor_type),
'key2': torch.tensor([3, 4], dtype=tensor_type),
}
if reduce_op == 'sum':
expected = {
'key1': torch.tensor([2, 4], dtype=tensor_type),
'key2': torch.tensor([4, 6], dtype=tensor_type),
}
else:
expected = {
'key1': torch.tensor([1, 2], dtype=tensor_type),
'key2': torch.tensor([2, 3], dtype=tensor_type),
}
dist.all_reduce_dict(data, reduce_op)
for key in data:
assert torch.allclose(data[key], expected[key])
# `torch.cat` in torch1.5 can not concatenate different types so we
# fallback to convert them all to float type.
if digit_version(TORCH_VERSION) == digit_version('1.5.0'):
if dist.get_rank() == 0:
data = {
'key1': torch.tensor([0, 1], dtype=torch.float32),
'key2': torch.tensor([1, 2], dtype=torch.int32)
}
else:
data = {
'key1': torch.tensor([2, 3], dtype=torch.float32),
'key2': torch.tensor([3, 4], dtype=torch.int32),
}
Zaida Zhou
committed
expected = {
'key1': torch.tensor([2, 4], dtype=torch.float32),
'key2': torch.tensor([4, 6], dtype=torch.float32),
}
Zaida Zhou
committed
dist.all_reduce_dict(data, 'sum')
Zaida Zhou
committed
for key in data:
assert torch.allclose(data[key], expected[key])
Zaida Zhou
committed
def test_all_gather_object(self):
self._init_dist_env(self.rank, self.world_size)
# data is a pickable python object
Zaida Zhou
committed
if dist.get_rank() == 0:
data = 'foo'
else:
data = {1: 2}
Zaida Zhou
committed
expected = ['foo', {1: 2}]
output = dist.all_gather_object(data)
Zaida Zhou
committed
self.assertEqual(output, expected)
# data is a list of pickable python object
if dist.get_rank() == 0:
data = ['foo', {1: 2}]
else:
data = {2: 3}
expected = [['foo', {1: 2}], {2: 3}]
output = dist.all_gather_object(data)
self.assertEqual(output, expected)
Zaida Zhou
committed
def test_gather_object(self):
self._init_dist_env(self.rank, self.world_size)
# data is a pickable python object
Zaida Zhou
committed
if dist.get_rank() == 0:
data = 'foo'
else:
data = {1: 2}
Zaida Zhou
committed
output = dist.gather_object(data, dst=0)
Zaida Zhou
committed
if dist.get_rank() == 0:
self.assertEqual(output, ['foo', {1: 2}])
else:
self.assertIsNone(output)
# data is a list of pickable python object
if dist.get_rank() == 0:
data = ['foo', {1: 2}]
else:
data = {2: 3}
output = dist.gather_object(data, dst=0)
if dist.get_rank() == 0:
self.assertEqual(output, [['foo', {1: 2}], {2: 3}])
else:
self.assertIsNone(output)
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
def test_all_reduce_params(self):
self._init_dist_env(self.rank, self.world_size)
tensor_types = [torch.int64, torch.float32]
reduce_ops = ['sum', 'mean']
coalesces = [True, False]
for tensor_type, reduce_op, coalesce in zip(tensor_types, reduce_ops,
coalesces):
if dist.get_rank() == 0:
data = [
torch.tensor([0, 1], dtype=tensor_type) for _ in range(100)
]
else:
data = (
torch.tensor([2, 3], dtype=tensor_type)
for _ in range(100))
data_gen = (item for item in data)
if reduce_op == 'sum':
expected = (
torch.tensor([2, 4], dtype=tensor_type)
for _ in range(100))
else:
expected = (
torch.tensor([1, 2], dtype=tensor_type)
for _ in range(100))
dist.all_reduce_params(data_gen, coalesce=coalesce, op=reduce_op)
for item1, item2 in zip(data, expected):
self.assertTrue(torch.allclose(item1, item2))
Zaida Zhou
committed
@unittest.skipIf(
torch.cuda.device_count() < 2, reason='need 2 gpu to test nccl')
class TestDistWithNCCLBackend(MultiProcessTestCase):
Zaida Zhou
committed
def _init_dist_env(self, rank, world_size):
"""Initialize the distributed environment."""
os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_PORT'] = '29505'
os.environ['RANK'] = str(rank)
Zaida Zhou
committed
num_gpus = torch.cuda.device_count()
torch.cuda.set_device(rank % num_gpus)
torch_dist.init_process_group(
backend='nccl', rank=rank, world_size=world_size)
def setUp(self):
super().setUp()
self._spawn_processes()
def test_all_reduce(self):
self._init_dist_env(self.rank, self.world_size)
tensor_types = [torch.int64, torch.float32]
reduce_ops = ['sum', 'mean']
device_types = ['cpu', 'cuda']
for tensor_type, reduce_op, device_type in product(
tensor_types, reduce_ops, device_types):
# 'mean' op does not support torch.int64
if tensor_type == torch.int64 and reduce_op == 'mean':
continue
Zaida Zhou
committed
if dist.get_rank() == 0:
data = torch.tensor([1, 2], dtype=tensor_type).to(device_type)
Zaida Zhou
committed
else:
data = torch.tensor([3, 4], dtype=tensor_type).to(device_type)
Zaida Zhou
committed
if reduce_op == 'sum':
expected = torch.tensor([4, 6],
dtype=tensor_type).to(device_type)
Zaida Zhou
committed
else:
expected = torch.tensor([2, 3],
dtype=tensor_type).to(device_type)
Zaida Zhou
committed
dist.all_reduce(data, reduce_op)
self.assertTrue(torch.allclose(data, expected))
def test_all_gather(self):
self._init_dist_env(self.rank, self.world_size)
for device_type in ('cpu', 'cuda'):
if dist.get_rank() == 0:
data = torch.tensor([0, 1]).to(device_type)
else:
data = torch.tensor([1, 2]).to(device_type)
expected = [
torch.tensor([0, 1]).to(device_type),
torch.tensor([1, 2]).to(device_type)
]
output = dist.all_gather(data)
self.assertTrue(
torch.allclose(output[dist.get_rank()],
expected[dist.get_rank()]))
Zaida Zhou
committed
def test_broadcast_dist(self):
self._init_dist_env(self.rank, self.world_size)
for device_type in ('cpu', 'cuda'):
if dist.get_rank() == 0:
data = torch.tensor([0, 1]).to(device_type)
else:
data = torch.tensor([1, 2]).to(device_type)
expected = torch.tensor([0, 1]).to(device_type)
dist.broadcast(data, 0)
assert torch.allclose(data, expected)
Zaida Zhou
committed
def test_sync_random_seed(self):
self._init_dist_env(self.rank, self.world_size)
with patch.object(
torch, 'tensor',
return_value=torch.tensor(1024)) as mock_tensor:
output = dist.sync_random_seed()
assert output == 1024
mock_tensor.assert_called()
def test_broadcast_object_list(self):
self._init_dist_env(self.rank, self.world_size)
Zaida Zhou
committed
data = ['foo', 12, {1: 2}]
Zaida Zhou
committed
data = [None, None, None]
expected = ['foo', 12, {1: 2}]
dist.broadcast_object_list(data)
self.assertEqual(data, expected)
def test_all_reduce_dict(self):
self._init_dist_env(self.rank, self.world_size)
tensor_types = [torch.int64, torch.float32]
reduce_ops = ['sum', 'mean']
device_types = ['cpu', 'cuda']
for tensor_type, reduce_op, device_type in product(
tensor_types, reduce_ops, device_types):
# 'mean' op does not support torch.int64
if tensor_type == torch.int64 and reduce_op == 'mean':
continue
Zaida Zhou
committed
if dist.get_rank() == 0:
data = {
'key1':
torch.tensor([0, 1], dtype=tensor_type).to(device_type),
'key2':
torch.tensor([1, 2], dtype=tensor_type).to(device_type),
Zaida Zhou
committed
}
else:
data = {
'key1':
torch.tensor([2, 3], dtype=tensor_type).to(device_type),
'key2':
torch.tensor([3, 4], dtype=tensor_type).to(device_type),
Zaida Zhou
committed
}
if reduce_op == 'sum':
expected = {
'key1':
torch.tensor([2, 4], dtype=tensor_type).to(device_type),
'key2':
torch.tensor([4, 6], dtype=tensor_type).to(device_type),
Zaida Zhou
committed
}
else:
expected = {
'key1':
torch.tensor([1, 2], dtype=tensor_type).to(device_type),
'key2':
torch.tensor([2, 3], dtype=tensor_type).to(device_type),
Zaida Zhou
committed
}
dist.all_reduce_dict(data, reduce_op)
for key in data:
assert torch.allclose(data[key], expected[key])
# `torch.cat` in torch1.5 can not concatenate different types so we
# fallback to convert them all to float type.
for device_type in ('cpu', 'cuda'):
if digit_version(TORCH_VERSION) == digit_version('1.5.0'):
if dist.get_rank() == 0:
data = {
'key1':
torch.tensor([0, 1],
dtype=torch.float32).to(device_type),
'key2':
torch.tensor([1, 2],
dtype=torch.int32).to(device_type),
}
else:
data = {
'key1':
torch.tensor([2, 3],
dtype=torch.float32).to(device_type),
'key2':
torch.tensor([3, 4],
dtype=torch.int32).to(device_type),
}
expected = {
'key1':
torch.tensor([2, 4], dtype=torch.float32).to(device_type),
'key2':
torch.tensor([4, 6], dtype=torch.float32).to(device_type),
}
dist.all_reduce_dict(data, 'sum')
for key in data:
assert torch.allclose(data[key], expected[key])
Zaida Zhou
committed
def test_all_gather_object(self):
self._init_dist_env(self.rank, self.world_size)
# data is a pickable python object
Zaida Zhou
committed
data = 'foo'
Zaida Zhou
committed
data = {1: 2}
Zaida Zhou
committed
expected = ['foo', {1: 2}]
output = dist.all_gather_object(data)
Zaida Zhou
committed
self.assertEqual(output, expected)
# data is a list of pickable python object
if dist.get_rank() == 0:
data = ['foo', {1: 2}]
else:
data = {2: 3}
expected = [['foo', {1: 2}], {2: 3}]
output = dist.all_gather_object(data)
self.assertEqual(output, expected)
Zaida Zhou
committed
def test_collect_results(self):
self._init_dist_env(self.rank, self.world_size)
# 1. test `device` and `tmpdir` parameters
Zaida Zhou
committed
if dist.get_rank() == 0:
data = ['foo', {1: 2}]
else:
data = [24, {'a': 'b'}]
Zaida Zhou
committed
size = 4
Zaida Zhou
committed
expected = ['foo', 24, {1: 2}, {'a': 'b'}]
# 1.1 test `device=cpu` and `tmpdir` is None
Zaida Zhou
committed
output = dist.collect_results(data, size, device='cpu')
if dist.get_rank() == 0:
self.assertEqual(output, expected)
else:
self.assertIsNone(output)
# 1.2 test `device=cpu` and `tmpdir` is not None
Zaida Zhou
committed
tmpdir = tempfile.mkdtemp()
# broadcast tmpdir to all ranks to make it consistent
object_list = [tmpdir]
dist.broadcast_object_list(object_list)
output = dist.collect_results(
data, size, device='cpu', tmpdir=object_list[0])
if dist.get_rank() == 0:
self.assertEqual(output, expected)
else:
self.assertIsNone(output)
Zaida Zhou
committed
if dist.get_rank() == 0:
# object_list[0] will be removed by `dist.collect_results`
self.assertFalse(osp.exists(object_list[0]))
# 1.3 test `device=gpu`
output = dist.collect_results(data, size, device='gpu')
if dist.get_rank() == 0:
self.assertEqual(output, expected)
else:
self.assertIsNone(output)
# 2. test `size` parameter
if dist.get_rank() == 0:
data = ['foo', {1: 2}]
else:
data = [24, {'a': 'b'}]
size = 3
expected = ['foo', 24, {1: 2}]
# 2.1 test `device=cpu` and `tmpdir` is None
output = dist.collect_results(data, size, device='cpu')
if dist.get_rank() == 0:
self.assertEqual(output, expected)
else:
self.assertIsNone(output)
# 2.2 test `device=gpu`
Zaida Zhou
committed
output = dist.collect_results(data, size, device='gpu')
if dist.get_rank() == 0:
self.assertEqual(output, expected)
else:
self.assertIsNone(output)
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
def test_all_reduce_params(self):
self._init_dist_env(self.rank, self.world_size)
tensor_types = [torch.int64, torch.float32]
reduce_ops = ['sum', 'mean']
coalesces = [True, False]
device_types = ['cpu', 'cuda']
for tensor_type, reduce_op, coalesce, device_type in zip(
tensor_types, reduce_ops, coalesces, device_types):
if dist.get_rank() == 0:
data = [
torch.tensor([0, 1], dtype=tensor_type).to(device_type)
for _ in range(100)
]
else:
data = [
torch.tensor([2, 3], dtype=tensor_type).to(device_type)
for _ in range(100)
]
data_gen = (item for item in data)
if reduce_op == 'sum':
expected = (
torch.tensor([2, 4], dtype=tensor_type).to(device_type)
for _ in range(100))
else:
expected = (
torch.tensor([1, 2], dtype=tensor_type).to(device_type)
for _ in range(100))
for item1, item2 in zip(data_gen, expected):
self.assertTrue(torch.allclose(item1, item2))