forked from cloud-custodian/cloud-custodian
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_executor.py
46 lines (29 loc) · 974 Bytes
/
test_executor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# Copyright The Cloud Custodian Authors.
# SPDX-License-Identifier: Apache-2.0
from c7n import executor
import unittest
class Foo:
def __init__(self, state):
self.state = state
def abc(self, *args, **kw):
return args, kw
@staticmethod
def run(*args, **kw):
return args, kw
@classmethod
def execute(cls, *args, **kw):
return args, kw
def __call__(self, *args, **kw):
return args, kw
class ExecutorBase:
def test_map_instance(self):
with self.executor_factory(max_workers=3) as w:
self.assertEqual(
list(w.map(Foo("123"), [1, 2, 3])), [((1,), {}), ((2,), {}), ((3,), {})]
)
class ThreadExecutorTest(ExecutorBase, unittest.TestCase):
executor_factory = executor.ThreadPoolExecutor
class MainExecutorTest(ExecutorBase, unittest.TestCase):
executor_factory = executor.MainThreadExecutor
if __name__ == "__main__":
unittest.main()