1
- """Utilities"""
2
-
3
- import argparse
4
- import re
5
- import sys
6
- from threading import Thread
7
- from typing import TYPE_CHECKING , Callable , Iterable , Optional , Tuple , Type , Union
1
+ """Internal utilities"""
8
2
9
3
from rich .progress import track
10
4
11
- if TYPE_CHECKING :
12
- from .client import Client
13
- from .events import EventFilter
14
-
15
5
16
6
class ConfigProgressBar :
17
7
"""Display a configuration Progress Bar."""
@@ -36,172 +26,6 @@ def close(self) -> None:
36
26
self .tracker .close ()
37
27
38
28
39
- class AttrDict (dict ):
40
- """Dictionary that allows accessing values using the "dot notation" as attributes."""
41
-
42
- def __init__ (self , * args , ** kwargs ) -> None :
43
- super ().__init__ (
44
- {
45
- _camel_to_snake (key ): to_attrdict (value )
46
- for key , value in dict (* args , ** kwargs ).items ()
47
- }
48
- )
49
-
50
- def __getattr__ (self , attr ):
51
- if attr in self :
52
- return self [attr ]
53
- raise AttributeError ("Attribute not found: " + str (attr ))
54
-
55
- def __setattr__ (self , attr , val ):
56
- if attr in self :
57
- raise AttributeError ("Attribute-style access is read only" )
58
- super ().__setattr__ (attr , val )
59
-
60
-
61
- def _camel_to_snake (name : str ) -> str :
62
- name = re .sub ("(.)([A-Z][a-z]+)" , r"\1_\2" , name )
63
- name = re .sub ("__([A-Z])" , r"_\1" , name )
64
- name = re .sub ("([a-z0-9])([A-Z])" , r"\1_\2" , name )
65
- return name .lower ()
66
-
67
-
68
- def to_attrdict (obj ):
69
- if isinstance (obj , AttrDict ):
70
- return obj
71
- if isinstance (obj , dict ):
72
- return AttrDict (obj )
73
- if isinstance (obj , list ):
74
- return [to_attrdict (elem ) for elem in obj ]
75
- return obj
76
-
77
-
78
- def run_client_cli (
79
- hooks : Optional [Iterable [Tuple [Callable , Union [type , "EventFilter" ]]]] = None ,
80
- argv : Optional [list ] = None ,
81
- ** kwargs ,
82
- ) -> None :
83
- """Run a simple command line app, using the given hooks.
84
-
85
- Extra keyword arguments are passed to the internal Rpc object.
86
- """
87
- from .client import Client # noqa
88
-
89
- _run_cli (Client , hooks , argv , ** kwargs )
90
-
91
-
92
- def run_bot_cli (
93
- hooks : Optional [Iterable [Tuple [Callable , Union [type , "EventFilter" ]]]] = None ,
94
- argv : Optional [list ] = None ,
95
- ** kwargs ,
96
- ) -> None :
97
- """Run a simple bot command line using the given hooks.
98
-
99
- Extra keyword arguments are passed to the internal Rpc object.
100
- """
101
- from .client import Bot # noqa
102
-
103
- _run_cli (Bot , hooks , argv , ** kwargs )
104
-
105
-
106
- def _run_cli (
107
- client_type : Type ["Client" ],
108
- hooks : Optional [Iterable [Tuple [Callable , Union [type , "EventFilter" ]]]] = None ,
109
- argv : Optional [list ] = None ,
110
- ** kwargs ,
111
- ) -> None :
112
- from .rpc import Rpc # noqa
113
-
114
- if argv is None :
115
- argv = sys .argv
116
-
117
- parser = argparse .ArgumentParser (prog = argv [0 ] if argv else None )
118
- parser .add_argument (
119
- "accounts_dir" ,
120
- help = "accounts folder (default: current working directory)" ,
121
- nargs = "?" ,
122
- )
123
- parser .add_argument ("--email" , action = "store" , help = "email address" )
124
- parser .add_argument ("--password" , action = "store" , help = "password" )
125
- args = parser .parse_args (argv [1 :])
126
-
127
- with Rpc (accounts_dir = args .accounts_dir , ** kwargs ) as rpc :
128
- core_version = rpc .get_system_info ().deltachat_core_version
129
- accounts = rpc .get_all_account_ids ()
130
- accid = accounts [0 ] if accounts else rpc .add_account ()
131
-
132
- client = client_type (rpc , hooks )
133
- client .logger .debug ("Running deltachat core %s" , core_version )
134
- if not rpc .is_configured (accid ):
135
- assert args .email , "Account is not configured and email must be provided"
136
- assert args .password , "Account is not configured and password must be provided"
137
- configure_thread = Thread (
138
- target = client .configure , args = (accid , args .email , args .password )
139
- )
140
- configure_thread .start ()
141
- client .run_forever ()
142
-
143
-
144
- def extract_addr (text : str ) -> str :
145
- """extract email address from the given text."""
146
- match = re .match (r".*\((.+@.+)\)" , text )
147
- if match :
148
- text = match .group (1 )
149
- text = text .rstrip ("." )
150
- return text .strip ()
151
-
152
-
153
- def parse_system_image_changed (text : str ) -> Optional [Tuple [str , bool ]]:
154
- """return image changed/deleted info from parsing the given system message text."""
155
- text = text .lower ()
156
- match = re .match (r"group image (changed|deleted) by (.+)." , text )
157
- if match :
158
- action , actor = match .groups ()
159
- return (extract_addr (actor ), action == "deleted" )
160
- return None
161
-
162
-
163
- def parse_system_title_changed (text : str ) -> Optional [Tuple [str , str ]]:
164
- text = text .lower ()
165
- match = re .match (r'group name changed from "(.+)" to ".+" by (.+).' , text )
166
- if match :
167
- old_title , actor = match .groups ()
168
- return (extract_addr (actor ), old_title )
169
- return None
170
-
171
-
172
- def parse_system_add_remove (text : str ) -> Optional [Tuple [str , str , str ]]:
173
- """return add/remove info from parsing the given system message text.
174
-
175
- returns a (action, affected, actor) tuple.
176
- """
177
- # You removed member a@b.
178
- # You added member a@b.
179
- # Member Me (x@y) removed by a@b.
180
- # Member x@y added by a@b
181
- # Member With space ([email protected] ) removed by [email protected] .
182
- # Member With space ([email protected] ) removed by Another member ([email protected] ).",
183
- # Group left by some one ([email protected] ).
184
- # Group left by [email protected] .
185
- text = text .lower ()
186
-
187
- match = re .match (r"member (.+) (removed|added) by (.+)" , text )
188
- if match :
189
- affected , action , actor = match .groups ()
190
- return action , extract_addr (affected ), extract_addr (actor )
191
-
192
- match = re .match (r"you (removed|added) member (.+)" , text )
193
- if match :
194
- action , affected = match .groups ()
195
- return action , extract_addr (affected ), "me"
196
-
197
- if text .startswith ("group left by " ):
198
- addr = extract_addr (text [13 :])
199
- if addr :
200
- return "removed" , addr , addr
201
-
202
- return None
203
-
204
-
205
29
def parse_docstring (txt ) -> tuple :
206
30
"""parse docstring, returning a tuple with short and long description"""
207
31
description = txt
0 commit comments