-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsignup_workflow.py
126 lines (110 loc) · 4.7 KB
/
signup_workflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
from dataclasses import dataclass
from datetime import timedelta
from iwf.command_request import CommandRequest, InternalChannelCommand, TimerCommand
from iwf.command_results import CommandResults
from iwf.communication import Communication
from iwf.communication_schema import CommunicationSchema, CommunicationMethod
from iwf.iwf_api.models import ChannelRequestStatus, PersistenceLoadingPolicy, PersistenceLoadingType
from iwf.persistence import Persistence
from iwf.persistence_schema import PersistenceField, PersistenceSchema
from iwf.rpc import rpc
from iwf.state_decision import StateDecision
from iwf.state_schema import StateSchema
from iwf.workflow import ObjectWorkflow
from iwf.workflow_context import WorkflowContext
from iwf.workflow_state import T, WorkflowState
@dataclass
class Form:
username: str
email: str
firstname: str
lastname: str
data_attribute_form = "form"
verify_channel = "verify"
data_attribute_status = "status"
data_attribute_verified_source = "source"
class SubmitState(WorkflowState[Form]):
def execute(
self,
ctx: WorkflowContext,
input: Form,
command_results: CommandResults,
persistence: Persistence,
communication: Communication,
) -> StateDecision:
persistence.set_data_attribute(data_attribute_form, input)
persistence.set_data_attribute(data_attribute_status, "waiting")
print(f"API to send verification email to {input.email}")
return StateDecision.single_next_state(VerifyState)
class VerifyState(WorkflowState[None]):
def wait_until(
self,
ctx: WorkflowContext,
input: T,
persistence: Persistence,
communication: Communication,
) -> CommandRequest:
return CommandRequest.for_any_command_completed(
TimerCommand.timer_command_by_duration(
timedelta(seconds=10)
), # use 10 seconds for demo
InternalChannelCommand.by_name(verify_channel),
)
def execute(
self,
ctx: WorkflowContext,
input: T,
command_results: CommandResults,
persistence: Persistence,
communication: Communication,
) -> StateDecision:
form = persistence.get_data_attribute(data_attribute_form)
if (
command_results.internal_channel_commands[0].status
== ChannelRequestStatus.RECEIVED
):
print(f"API to send welcome email to {form.email}")
return StateDecision.graceful_complete_workflow("done")
else:
print(f"API to send the a reminder email to {form.email}")
return StateDecision.single_next_state(VerifyState)
class UserSignupWorkflow(ObjectWorkflow):
def get_workflow_states(self) -> StateSchema:
return StateSchema.with_starting_state(SubmitState(), VerifyState())
def get_persistence_schema(self) -> PersistenceSchema:
return PersistenceSchema.create(
PersistenceField.data_attribute_def(data_attribute_form, Form),
PersistenceField.data_attribute_def(data_attribute_status, str),
PersistenceField.data_attribute_def(data_attribute_verified_source, str),
)
def get_communication_schema(self) -> CommunicationSchema:
return CommunicationSchema.create(
CommunicationMethod.internal_channel_def(verify_channel, None)
)
@rpc(
data_attribute_loading_policy=PersistenceLoadingPolicy(
persistence_loading_type=PersistenceLoadingType.LOAD_PARTIAL_WITH_EXCLUSIVE_LOCK,
locking_keys=[data_attribute_form, data_attribute_status, data_attribute_verified_source],
)
)
def verify(
self, source: str, persistence: Persistence, communication: Communication
) -> str:
status = persistence.get_data_attribute(data_attribute_status)
if status == "verified":
return "already verified"
persistence.set_data_attribute(data_attribute_status, "verified")
persistence.set_data_attribute(data_attribute_verified_source, source)
communication.publish_to_internal_channel(verify_channel)
return "done"
@rpc(
data_attribute_loading_policy=PersistenceLoadingPolicy(
persistence_loading_type=PersistenceLoadingType.LOAD_PARTIAL_WITH_EXCLUSIVE_LOCK,
locking_keys=[data_attribute_form, data_attribute_status, data_attribute_verified_source],
)
)
def describe(self, persistence: Persistence) -> tuple:
form = persistence.get_data_attribute(data_attribute_form)
status = persistence.get_data_attribute(data_attribute_status)
source = persistence.get_data_attribute(data_attribute_verified_source)
return form, status, source