-
Notifications
You must be signed in to change notification settings - Fork 208
/
Copy pathes_last_updated.py
77 lines (64 loc) · 2.4 KB
/
es_last_updated.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0
from typing import Iterator, Union
from amundsen_rds.models import RDSModel
from amundsen_rds.models.updated_timestamp import UpdatedTimestamp as RDSUpdatedTimestamp
from databuilder.models.graph_node import GraphNode
from databuilder.models.graph_relationship import GraphRelationship
from databuilder.models.graph_serializable import GraphSerializable
from databuilder.models.table_serializable import TableSerializable
class ESLastUpdated(GraphSerializable, TableSerializable):
"""
Data model to keep track the last updated timestamp for
datastore and es.
"""
LABEL = 'Updatedtimestamp'
KEY = 'amundsen_updated_timestamp'
LATEST_TIMESTAMP = 'latest_timestamp'
def __init__(self,
timestamp: int,
) -> None:
"""
:param timestamp: epoch for latest updated timestamp for neo4j an es
"""
self.timestamp = timestamp
self._node_iter = self._create_node_iterator()
self._rel_iter = self._create_relation_iterator()
self._record_iter = self._create_record_iterator()
def create_next_node(self) -> Union[GraphNode, None]:
"""
Will create an orphan node for last updated timestamp.
"""
try:
return next(self._node_iter)
except StopIteration:
return None
def _create_node_iterator(self) -> Iterator[GraphNode]:
"""
Create an es_updated_timestamp node
"""
node = GraphNode(
key=ESLastUpdated.KEY,
label=ESLastUpdated.LABEL,
attributes={
ESLastUpdated.LATEST_TIMESTAMP: self.timestamp
}
)
yield node
def create_next_relation(self) -> Union[GraphRelationship, None]:
try:
return next(self._rel_iter)
except StopIteration:
return None
def _create_relation_iterator(self) -> Iterator[GraphRelationship]:
return
yield
def create_next_record(self) -> Union[RDSModel, None]:
try:
return next(self._record_iter)
except StopIteration:
return None
def _create_record_iterator(self) -> Iterator[RDSModel]:
record = RDSUpdatedTimestamp(rk=ESLastUpdated.KEY,
latest_timestamp=self.timestamp)
yield record