Skip to content

Commit

Permalink
feature/#416 documentation for core notification (#1047)
Browse files Browse the repository at this point in the history
* added simple documentation for core notification

* moved topic content from event to topic file

* added info about event

* added more info about consumer

* change doc

* change doc

* added flake8

* added examples

* Update docs/manuals/userman/sdm/events/code-example/user-changes-notifier.py

Co-authored-by: Jean-Robin <[email protected]>

* Update docs/manuals/userman/sdm/events/code-example/user-changes-notifier.py

Co-authored-by: Jean-Robin <[email protected]>

* added examples and minor fixes

---------

Co-authored-by: Toan Quach <[email protected]>
Co-authored-by: Jean-Robin <[email protected]>
  • Loading branch information
3 people committed Sep 3, 2024
1 parent 5cbdf5d commit 4c2845a
Show file tree
Hide file tree
Showing 8 changed files with 457 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from time import sleep

from taipy.core.notification import CoreEventConsumerBase, Event, Notifier


# Define a custom event consumer.
class MyEventConsumer(CoreEventConsumerBase):
def process_event(self, event: Event):
# Custom event processing logic here
print(f"Received a {event.entity_type} {event.operation} event at : {event.creation_date}")


if __name__ == "__main__":
import taipy as tp
from taipy import Config

# Create a scenario configuration.
some_datanode_cfg = Config.configure_data_node("data", default_data="Some content.")
print_task_cfg = Config.configure_task("print", print, some_datanode_cfg)
scenario_config = Config.configure_scenario("scenario", [print_task_cfg])

# Run the core service.
orchestrator = tp.Orchestrator().run()

# Register to the Notifier to retrieve a registration id and a registered queue.
registration_id, registered_queue = Notifier.register()

# Create a consumer and start it.
consumer = MyEventConsumer(registration_id, registered_queue)
consumer.start()

# The scenario creation and submission will trigger event emissions.
scenario = tp.create_scenario(scenario_config)
submission = tp.submit(scenario)

# The events are processed in parallel by the consumer.
# So we need to wait for the consumer to process the events.
sleep(1)

# Stop the consumer and unregister from the Notifier.
consumer.stop()
Notifier.unregister(registration_id)
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import requests

import taipy as tp
import taipy.gui.builder as tgb
from taipy import Config, Gui, Orchestrator
from taipy.core import SubmissionStatus
from taipy.core.job.status import Status
from taipy.core.notification import CoreEventConsumerBase, EventEntityType, EventOperation, Notifier
from taipy.gui import notify

##### Configuration and Functions #####


def fail_task(name: str):
raise Exception(f"This function is trigger by {name} and is supposed to fail, and it did!")


name_data_node_cfg = Config.configure_data_node(id="input_name", default_data="Florian")
message_data_node_cfg = Config.configure_data_node(id="message")
build_msg_task_cfg = Config.configure_task("build_msg", fail_task, name_data_node_cfg, message_data_node_cfg)
scenario_cfg = Config.configure_scenario("scenario", task_configs=[build_msg_task_cfg])

value = "Default text"


#### Notification function to be called ####


def trigger_api_of_job_failure(job_id):
requests.get("http://127.0.0.1:5000/replace-this-with-your-api", params={"message": f"Job {job_id} failed."})


class JobFailureCoreConsumer(CoreEventConsumerBase):
def __init__(self):
reg_id, queue = Notifier.register(
entity_type=EventEntityType.JOB, operation=EventOperation.UPDATE, attribute_name="status"
) # Adapt the registration to the events you want to listen to
super().__init__(reg_id, queue)

def process_event(self, event):
if event.attribute_value == Status.FAILED:
trigger_api_of_job_failure(event.entity_id)


#### Normal callbacks ####


def create_and_submit_scenario(state):
scenario = tp.create_scenario(config=scenario_cfg)
tp.submit(scenario)


#### Page ####

with tgb.Page() as page:
tgb.text("{value}")
tgb.button("Create and submit a scenario!", on_action=create_and_submit_scenario)


if __name__ == "__main__":
orchestrator = Orchestrator()
gui = Gui(page)
orchestrator.run()
JobFailureCoreConsumer().start()
gui.run()
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from time import sleep

from taipy.core.notification import CoreEventConsumerBase, Event, EventEntityType, EventOperation, Notifier


def double(nb):
return nb * 2


# Define a custom event consumer.
class MyEventConsumer(CoreEventConsumerBase):
def process_event(self, event: Event):
# Custom event processing logic here'
print(f"Received event of type: {event.entity_type}; and of operation: {event.operation}.")


if __name__ == "__main__":
import taipy as tp
from taipy import Config

print(f"(1) Number of jobs: {len(tp.get_jobs())}.")

# Create a scenario configuration with 2 sequential tasks.
input_data_node_cfg = Config.configure_data_node("my_input", default_data=21)
print_task_cfg = Config.configure_task("print_task", print, input_data_node_cfg)
scenario_config = Config.configure_scenario("my_scenario", [print_task_cfg])

# Run the core service.
tp.Orchestrator().run()

# Register to the Notifier to retrieve events related to all scenarios' creations.
registration_id, registered_queue = Notifier.register(EventEntityType.SCENARIO, operation=EventOperation.CREATION)

# Create a consumer and start it.
consumer = MyEventConsumer(registration_id, registered_queue)
consumer.start()

# Create a scenario and submit it.
scenario = tp.create_scenario(scenario_config)

sleep(1)

# Stop the consumer and unregister from the Notifier.
consumer.stop()
Notifier.unregister(registration_id)
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import taipy as tp
import taipy.gui.builder as tgb
from taipy import Config, Gui, Orchestrator
from taipy.core import SubmissionStatus
from taipy.core.notification import CoreEventConsumerBase, EventEntityType, EventOperation, Notifier
from taipy.gui import notify

##### Configuration and Functions #####


def build_message(name: str):
return f"Hello {name}!"


name_data_node_cfg = Config.configure_data_node(id="input_name", default_data="Florian")
message_data_node_cfg = Config.configure_data_node(id="message")
build_msg_task_cfg = Config.configure_task("build_msg", build_message, name_data_node_cfg, message_data_node_cfg)
scenario_cfg = Config.configure_scenario("scenario", task_configs=[build_msg_task_cfg])

value = "Default text"


#### Notification function to be called ####


def notify_users_of_creation(state):
state.value = "Scenario created and submitted"
notify(state, "s", "Scenario Created")


def notify_users_of_update(state, new_value_of_dn):
print("Value of Data Node:", new_value_of_dn)
state.value = f"Data Node updated with value: {new_value_of_dn}"
notify(state, "i", "Data Node Updated")


class SpecificCoreConsumer(CoreEventConsumerBase):
def __init__(self, gui):
self.gui = gui
reg_id, queue = Notifier.register() # Adapt the registration to the events you want to listen to
super().__init__(reg_id, queue)

def process_event(self, event):
if event.operation == EventOperation.CREATION:
if event.entity_type == EventEntityType.SCENARIO:
self.gui.broadcast_callback(notify_users_of_creation)
elif event.operation == EventOperation.UPDATE:
if event.entity_type == EventEntityType.SUBMISSION:
print(event)
if event.attribute_value == SubmissionStatus.COMPLETED:
scenario_id = event.metadata["origin_entity_id"]
scenario = tp.get(scenario_id)
new_value_of_dn = scenario.message.read()
self.gui.broadcast_callback(notify_users_of_update, [new_value_of_dn])


#### Normal callbacks ####


def create_and_submit_scenario(state):
scenario = tp.create_scenario(config=scenario_cfg)
tp.submit(scenario)


#### Page ####

with tgb.Page() as page:
tgb.text("{value}")
tgb.button("Create and submit a scenario!", on_action=create_and_submit_scenario)


if __name__ == "__main__":
orchestrator = Orchestrator()
gui = Gui(page)
orchestrator.run()
SpecificCoreConsumer(gui).start()
gui.run()
74 changes: 74 additions & 0 deletions docs/manuals/userman/sdm/events/events-description.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
The `Event^` object in Taipy is a notification mechanism for any changes occurring within Taipy Core,
particularly those related to the Scenario and Data Management. It is a crucial part of the system that allows
tracking and responding to changes in the state of various entities managed by Taipy, such as data nodes, jobs,
and scenarios.

An `Event^` object is composed of several key attributes that describe what happened, the
type of operation performed, the entity concerned and its type, and other contextual
details.

1. `entity_type`
- **Type**: `EventEntityType^` (Enum)
- **Description**: Specifies the type of entity that has undergone a change. This
attribute helps identify the nature of the object affected. The possible entity
types are:

- `CYCLE`
- `SCENARIO`
- `SEQUENCE`
- `TASK`
- `DATA_NODE`
- `JOB`
- `SUBMISSION`

2. `operation`
- **Type**: `EventOperation^` (Enum)
- **Description**: Indicates the type of operation performed. The `operation` attribute
is essential for understanding the nature of the change. The possible operations are:

- `CREATION` - An entity has been created.
- `UPDATE` - An entity has been updated.
- `DELETION` - An entity has been deleted.
- `SUBMISSION` - An entity has been submitted for processing.

3. `entity_id`
- **Type**: `str`
- **Description**: The unique identifier for the entity that has been changed. This
ID allows you to precisely identify which object in the system the event refers to.

4. `attribute_name`
- **Type**: `str`
- **Description**: The name of the specific attribute that has been changed within
the entity. This attribute is only relevant for `UPDATE` operations, where
a specific field of an entity has been modified.

5. `attribute_value`
- **Type**: `Any`
- **Description**: The new value of the changed attribute. Like `attribute_name`, this
only applies to `UPDATE` operations.

6. `metadata`
- **Type**: `dict`
- **Description**: A dictionary containing additional metadata about the source of the
event. This can include context-specific information that provides more insight into
the event's origin or purpose.

7. `creation_date`
- **Type**: `datetime`
- **Description**: The exact date and time the event was created.

!!! example "Scenario creation event"
For example, when a scenario is created, an event is emitted with
the following attributes:
- `entity_type`: `EventEntityType.SCENARIO^`
- `operation`: `EventOperation.CREATION^`
- `entity_id`: the id of the scenario
- `creation_date`: the date and time of the event creation

Events are particularly useful when you want to:
- Update the user interface (e.g., update a list of scenarios when a new one is created)
- Trigger an action (e.g., automatically submit a scenario when its input data is updated)
- Notify end-users (e.g., send a GUI notification to all users when a job fails)
- etc.

For more examples, see the [examples](examples.md) page.
38 changes: 38 additions & 0 deletions docs/manuals/userman/sdm/events/examples.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
Examples of using Taipy event notifications to capture and consume *events*.

# Real-Time GUI Updates with Taipy Event Consumers

This script defines a custom event consumer class `SpecificCoreConsumer`, which listens
for all events published by Taipy Core and triggers GUI notification based on those events.
It includes determining if the event is published from a `Scenario^` entity or `DataNode^` entity
and if the action is `CREATION^` or `UPDATE^`.

!!! example
```python linenums="1"
{%
include-markdown "./code-example/user-changes-notifier.py"
comments=false
%}
```

This snippet shows a how you can capture and process events to notify user whenever
a new scenario is created or the value of a data node is updated.
For more details, see the [registration](understanding-topics.md) page.

# External API triggered with Taipy Event Consumers

This script defines a custom event consumer class `JobFailureCoreConsumer`, which listens
for all events published by Taipy Core, when a `JOB^` entity's `status` attribute is `UPDATE`,
and triggers an external API call based on the `JOB^`'s `id`.

!!! example
```python linenums="1"
{%
include-markdown "./code-example/external-api-call-notifier.py"
comments=false
%}
```

This snippet shows a how you can capture and process `JOB^` events when an `UPDATE` is made to the `status`
of the `JOB^` and request an external API.
For more details, see the [registration](understanding-topics.md) page.
Loading

0 comments on commit 4c2845a

Please sign in to comment.