-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathplugin.py
76 lines (64 loc) · 2.3 KB
/
plugin.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import json
from typing import List, Generator
import structlog
from cloudquery.sdk import message
from cloudquery.sdk import plugin
from cloudquery.sdk import schema
from cloudquery.sdk.scheduler import Scheduler, TableResolver
from plugin import tables
from plugin.client import Client, Spec
PLUGIN_NAME = "example"
PLUGIN_VERSION = "0.0.1"
TEAM_NAME = "cloudquery"
PLUGIN_KIND = "source"
class ExamplePlugin(plugin.Plugin):
def __init__(self) -> None:
super().__init__(
PLUGIN_NAME, PLUGIN_VERSION, plugin.plugin.Options(team=TEAM_NAME, kind=PLUGIN_KIND)
)
self._spec_json = None
self._spec = None
self._scheduler = None
self._client = None
self._logger = structlog.get_logger()
def set_logger(self, logger) -> None:
self._logger = logger
def init(self, spec, no_connection: bool = False):
if no_connection:
return
self._spec_json = json.loads(spec)
self._spec = Spec(**self._spec_json)
self._spec.validate()
self._scheduler = Scheduler(
self._spec.concurrency, self._spec.queue_size, logger=self._logger
)
self._client = Client(self._spec)
def get_tables(self, options: plugin.TableOptions) -> List[plugin.Table]:
all_tables: List[plugin.Table] = [
tables.Items(),
]
# set parent table relationships
for table in all_tables:
for relation in table.relations:
relation.parent = table
# set initial values
if options.tables is None:
options.tables = []
if options.skip_tables is None:
options.skip_tables = []
return schema.filter_dfs(all_tables, options.tables, options.skip_tables)
def sync(
self, options: plugin.SyncOptions
) -> Generator[message.SyncMessage, None, None]:
resolvers: list[TableResolver] = []
for table in self.get_tables(
plugin.TableOptions(
tables=options.tables,
skip_tables=options.skip_tables,
skip_dependent_tables=options.skip_dependent_tables,
)
):
resolvers.append(table.resolver)
return self._scheduler.sync(
self._client, resolvers, options.deterministic_cq_id
)