Skip to content

Commit 46f0e8b

Browse files
committed
First commit
1 parent 65e72c4 commit 46f0e8b

File tree

4 files changed

+250
-0
lines changed

4 files changed

+250
-0
lines changed

Diff for: azureeventhubreceiveevent/README.md

+168
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
# Azure Event Hub receiver event Python example
2+
3+
This folder contains a Python application example that handles Event Hubs on Microsoft Azure.
4+
5+
It handles an Event Hub and receives events from an event hub event stream.
6+
7+
## Requirements
8+
9+
* You must have a [Microsoft Azure](https://azure.microsoft.com/) subscription.
10+
11+
* The code was written for:
12+
* Python 3
13+
* Azure SDKs for Python
14+
15+
* You install individual Azure library packages on a per-project basis depending on your needs. It is recommended using Python virtual environments for each project. There is no standalone "SDK" installer for Python.
16+
17+
* Install the specified python packages.
18+
19+
```bash
20+
pip install -r requirements.txt
21+
```
22+
23+
## Using the code
24+
25+
* Configure your Azure access.
26+
27+
You must create an Azure AD service principal in order to enable application to connect resources into Azure. The service principal grants your application to manage resources in your Azure subscription.
28+
29+
The Azure SDKs Libraries for Java allow you to use several authentication schemes.
30+
31+
The application uses an authentication file for authenticating.
32+
33+
The credentials are taken from `AZURE_AUTH_LOCATION` environment variable.
34+
35+
You can create a service principal and generate this file using Azure CLI 2.0 or using the Azure cloud shell.
36+
37+
* Make sure you select your subscription by:
38+
39+
```bash
40+
az account set --subscription <name or id>
41+
```
42+
43+
and you have the privileges to create service principals.
44+
45+
* Execute the following command for creating the service principal and the authentication file:
46+
47+
```bash
48+
az ad sp create-for-rbac --sdk-auth > my.azureauth
49+
```
50+
51+
* Set the `AZURE_AUTH_LOCATION` environment variable in your Operating System with the path of your authentication file.
52+
53+
```bash
54+
AZURE_AUTH_LOCATION = /path/to/my.azureauth
55+
```
56+
57+
* Create an Event Hubs Namespace and an Event Hub.
58+
59+
1. Create an Event Hubs Namespace.
60+
61+
An Event Hubs namespace provides a unique scoping container, in which you create one or more event hubs.
62+
63+
To create a namespace in your resource group using the portal, do the following actions:
64+
65+
1. You must create the Event Hubs Namespace, using the Azure console.
66+
67+
2. Select the your data for: Suscription, Resource group, Namespace name and Location.
68+
69+
3. Choose Basic for the pricing tier.
70+
71+
2. Create an Event Hub.
72+
73+
You must create the Event Hub, using the Azure console.
74+
75+
To create an event hub within the namespace, do the following actions:
76+
77+
1. On the Event Hubs Namespace page, select `Event Hubs` in the left menu.
78+
79+
2. At the top of the window, select `+ Event Hub`.
80+
81+
3. Type a name for your event hub, then select `Create`.
82+
83+
3. Create a SAS Policy.
84+
85+
You must create the SAS Policy, using the Azure console.
86+
87+
1. On the Event Hubs page for the Event Hub created, select `Shared access policies` in the left menu.
88+
89+
2. At the top of the window, select `+ Add`.
90+
91+
3. Type a name for your Policy, select `Manage`, that includes `Send` and `Listen`, then select `Create`.
92+
93+
* Create an Azure storage account and a blob container.
94+
95+
Create an Azure storage account and a blob container in it by doing the following steps, using the Azure console:
96+
97+
1. Create an Azure Storage account.
98+
99+
2. Create a blob container.
100+
101+
3. Get the connection string to the storage account.
102+
103+
* Configure your application.
104+
105+
We store the configuration information in a config file (`app.cfg`). The file content is:
106+
107+
```bash
108+
[Configuration]
109+
StorageAccountConnectionString=<STORAGE_ACCOUNT_CONNECTION_STRING>
110+
BlobName=<BLOB_NAME>
111+
EventHubConnectionString=<EVENT_HUB_CONNECTION_STRING>
112+
EventHubName=<EVENT_HUB_NAME>
113+
```
114+
115+
You only need to edit the file `app.cfg` and modify the values of:
116+
117+
* `<STORAGE_ACCOUNT_CONNECTION_STRING>` by the Connection string to the Storage Account.
118+
* `<BLOB_NAME>` by the Blob name in the Storage Account.
119+
* `<EVENT_HUB_CONNECTION_STRING>` by the Connection string to the Event Hub.
120+
* `<EVENT_HUB_NAME>` by the name of the Event Hub.
121+
122+
The application uses this information for accessing your Event Hub and Storage Account.
123+
124+
* Run the code.
125+
126+
Execute the receiver application:
127+
128+
```bash
129+
python receivereh.py
130+
```
131+
132+
You should see the next message in you receiver application:
133+
134+
```bash
135+
Waiting for an event
136+
```
137+
138+
The aplication is waiting for some event from the Event Hub.
139+
140+
* Test the application.
141+
142+
You must send an event to your Event Hub.
143+
144+
You can use the Python application `sendereh.py` (Event Hub send event). You can get it following this link: [../azureeventhubsendevent/](../azureeventhubsendevent)
145+
146+
In another command line console, execute the sender application:
147+
148+
```bash
149+
python sendereh.py
150+
```
151+
152+
You should see the next message in your sender application:
153+
154+
```bash
155+
Preparing batch of events
156+
Sending batch of events
157+
Sent
158+
```
159+
160+
When the receiver application gets the event, you should see the next message in you receiver application:
161+
162+
```bash
163+
Received the event: "<XXXXXXXXXXXXXXX>"
164+
from the partition with ID: <X>
165+
EnqueuedTimeUtc: <YYYY-MM-DD HH:MM:SS.XXXXXXXXX>
166+
SequenceNumber: <XX>
167+
Offset: <XXXX>
168+
```

Diff for: azureeventhubreceiveevent/app.cfg

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
[Configuration]
2+
StorageAccountConnectionString=<STORAGE_ACCOUNT_CONNECTION_STRING>
3+
BlobName=<BLOB_NAME>
4+
EventHubConnectionString=<EVENT_HUB_CONNECTION_STRING>
5+
EventHubName=<EVENT_HUB_NAME>

Diff for: azureeventhubreceiveevent/receivereh.py

+75
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
#!/usr/bin/python
2+
# -*- coding: utf-8 -*-
3+
# receivereh.py
4+
# It is an example that handles Event Hubs on Microsoft Azure.
5+
# It handles an Event Hub and receives events from an event hub event stream.
6+
7+
import sys
8+
import os
9+
import configparser
10+
import asyncio
11+
from azure.eventhub.aio import EventHubConsumerClient
12+
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
13+
14+
15+
def loadcfg():
16+
"""
17+
Read configuration information from a config file
18+
and return the values in a dictionary.
19+
"""
20+
config_file = 'app.cfg'
21+
if os.path.exists(config_file):
22+
config = configparser.RawConfigParser()
23+
config.read(config_file)
24+
else:
25+
print('Config file "' + config_file + '" does not exist')
26+
sys.exit(1)
27+
28+
return dict(config.items('Configuration'))
29+
30+
31+
async def on_event(partition_context, event):
32+
# Print the event data.
33+
print('Received an event: \"{}\"'.format(event.body_as_str(encoding='UTF-8')))
34+
print(f' from the partition with ID: {partition_context.partition_id}')
35+
print(f' EnqueuedTimeUtc: {event.enqueued_time}')
36+
print(f' SequenceNumber: {event.sequence_number}')
37+
print(f' Offset: {event.offset}')
38+
39+
# Update the checkpoint so that the program doesn't read the events
40+
# that it has already read when you run it next time.
41+
await partition_context.update_checkpoint(event)
42+
43+
44+
async def run(storage_account_connection_string, blob_name, event_hub_connection_string, event_hub_name):
45+
# Create an Azure blob checkpoint store to store the checkpoints.
46+
checkpoint_store = BlobCheckpointStore.from_connection_string(storage_account_connection_string,
47+
blob_name)
48+
49+
# Create a consumer client for the event hub.
50+
client = EventHubConsumerClient.from_connection_string(event_hub_connection_string,
51+
consumer_group="$Default",
52+
eventhub_name=event_hub_name,
53+
checkpoint_store=checkpoint_store)
54+
55+
async with client:
56+
# Call the receive method. Read from the beginning of the partition (starting_position: "-1")
57+
await client.receive(on_event=on_event, starting_position="-1")
58+
59+
60+
def main():
61+
# Read configuration information
62+
config_dict = loadcfg()
63+
cfg_storage_account_connection_string = config_dict['storageaccountconnectionstring']
64+
cfg_blob_name = config_dict['blobname']
65+
cfg_event_hub_connection_string = config_dict['eventhubconnectionstring']
66+
cfg_event_hub_name = config_dict['eventhubname']
67+
68+
print('Waiting for events')
69+
loop = asyncio.get_event_loop()
70+
# Run the main method.
71+
loop.run_until_complete(run(cfg_storage_account_connection_string, cfg_blob_name, cfg_event_hub_connection_string, cfg_event_hub_name))
72+
73+
74+
if __name__ == '__main__':
75+
main()

Diff for: azureeventhubreceiveevent/requirements.txt

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
azure-eventhub
2+
azure-eventhub-checkpointstoreblob-aio

0 commit comments

Comments
 (0)