-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhass.py
88 lines (76 loc) · 3.34 KB
/
hass.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
import config
import urllib3
from enum import Enum
from requests import get, post
import utils
from utils import log_stderr, log_stdout
from datetime import datetime, timedelta
if not config.HASS_CHECK_SSL_CERT:
urllib3.disable_warnings()
headers = {"Authorization": "Bearer " + config.HASS_TOKEN, "content-type": "application/json"}
old_entity = {}
refresh_time = publish_time = datetime.now()
STATE_API_URL = "api/states/"
STATUS_ENTITY_API_URL = STATE_API_URL + "sensor."
SERVICE_API_URL = "api/services/media_player/"
SERVICE_API_VOLUME_SET = SERVICE_API_URL + "volume_set"
SERVICE_API_VOLUME_MUTE = SERVICE_API_URL + "volume_mute"
SERVICE_API_PLAY = SERVICE_API_URL + "media_play"
SERVICE_API_PAUSE = SERVICE_API_URL + "media_pause"
SERVICE_API_STOP = SERVICE_API_URL + "media_stop"
SERVICE_API_SELECT_SOURCE = SERVICE_API_URL + "select_source"
RETRY_MINUTES = 2
REFRESH_MINUTES = 10
class Service(Enum):
VOLUME_SET = 0
VOLUME_MUTE = 1
PLAY = 2
PAUSE = 3
STOP = 4
SELECT_SOURCE = 5
def get_status(io_status):
global publish_time
try:
url = config.HASS_SERVER + STATE_API_URL + config.HASS_PLAYER_ENTITY_ID
if datetime.now() >= publish_time:
response = get(url, headers=headers, verify=config.HASS_CHECK_SSL_CERT)
utils.log_stdout('HASS', 'GET_STATE ({}): {}'.format(config.HASS_PLAYER_ENTITY_ID, response.text))
json = response.json()
io_status.friendly_name = json['attributes']['friendly_name']
io_status.state = json['state']
io_status.volume_level = json['attributes']['volume_level']
io_status.sources = json['attributes']['source_list']
io_status.source = json['attributes']['source']
io_status.is_volume_muted = json['attributes']['is_volume_muted']
except Exception as e:
log_stderr('*HASS* ERR: GET_STATE ({}): {}'.format(config.HASS_PLAYER_ENTITY_ID, e))
# exit and delay next publish for 60 secs
publish_time = datetime.now() + timedelta(seconds=60)
def set_service(io_status, service):
global publish_time
try:
json = {"entity_id": config.HASS_PLAYER_ENTITY_ID}
if service == Service.VOLUME_MUTE:
url = config.HASS_SERVER + SERVICE_API_VOLUME_MUTE
json.update({"is_volume_muted": io_status.is_volume_muted})
elif service == Service.VOLUME_SET:
url = config.HASS_SERVER + SERVICE_API_VOLUME_SET
json.update({"volume_level": io_status.volume_level})
elif service == Service.SELECT_SOURCE:
url = config.HASS_SERVER + SERVICE_API_SELECT_SOURCE
json.update({"source": io_status.source})
elif service == Service.PLAY:
url = config.HASS_SERVER + SERVICE_API_PLAY
elif service == Service.PAUSE:
url = config.HASS_SERVER + SERVICE_API_PAUSE
elif service == Service.STOP:
url = config.HASS_SERVER + SERVICE_API_STOP
if datetime.now() >= publish_time:
response = post(url, headers=headers, verify=config.HASS_CHECK_SSL_CERT, json=json)
except Exception as e:
log_stderr('*HASS* ERR: SET_MUTE ({}): {}'.format(config.HASS_PLAYER_ENTITY_ID, e))
# exit and delay next publish for 60 secs
publish_time = datetime.now() + timedelta(seconds=60)