Skip to content

Commit b8113cf

Browse files
Create table sitrep_messages
1 parent 8d50385 commit b8113cf

File tree

3 files changed

+42
-7
lines changed

3 files changed

+42
-7
lines changed

airflow/dags/download_secmar_json_ftp.py

+3
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,9 @@ def secmar_json_sql_task(dag, filename):
128128
insert_snosan_json_unique = secmar_json_sql_task(dag, "insert_snosan_json_unique")
129129
insert_snosan_json_unique.set_upstream(copy_json_data)
130130

131+
insert_sitrep_messages = secmar_json_sql_task(dag, "insert_sitrep_messages")
132+
insert_sitrep_messages.set_upstream(insert_snosan_json_unique)
133+
131134
start_create_codes_tables = DummyOperator(task_id="start_create_codes_tables", dag=dag)
132135
start_create_codes_tables.set_upstream(insert_snosan_json_unique)
133136
end_create_codes_tables = DummyOperator(task_id="end_create_codes_tables", dag=dag)

airflow/dags/replace_secmar_database.py

+3-7
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
)
2727
dag.doc_md = __doc__
2828

29-
tables = SECMAR_TABLES + ["operations_stats", "moyens_snsm"]
29+
tables = SECMAR_TABLES + ["operations_stats", "moyens_snsm", "sitrep_messages"]
3030

3131
template = "sudo -u postgres pg_dump -c --no-owner {tables} {schema} > {output}"
3232
dump_command = template.format(
@@ -35,9 +35,7 @@
3535
tables=" ".join(["-t " + t for t in tables]),
3636
)
3737

38-
dump_local_database = BashOperator(
39-
task_id="dump_local_database", bash_command=dump_command, dag=dag
40-
)
38+
dump_local_database = BashOperator(task_id="dump_local_database", bash_command=dump_command, dag=dag)
4139

4240
template = "psql -U {user} -h {host} {schema} < {input}"
4341
target_connection = PostgresHook.get_connection("target_secmar")
@@ -55,7 +53,5 @@
5553
)
5654
import_remote_database.set_upstream(dump_local_database)
5755

58-
delete_dump_file = BashOperator(
59-
task_id="delete_dump_file", bash_command="rm " + OUTPUT_PATH, dag=dag
60-
)
56+
delete_dump_file = BashOperator(task_id="delete_dump_file", bash_command="rm " + OUTPUT_PATH, dag=dag)
6157
delete_dump_file.set_upstream(import_remote_database)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
CREATE TABLE sitrep_messages (
2+
operation_id bigint primary key,
3+
sitrep text
4+
);
5+
6+
INSERT INTO sitrep_messages
7+
(operation_id, sitrep)
8+
select
9+
sjoi.operation_id,
10+
array_to_string(array_agg(value order by coalesce(mapping.dst, key) asc), ' ')
11+
from (
12+
select
13+
u.data->>'chrono' as chrono,
14+
replace(key, 'paragraphe', '') as key,
15+
value
16+
from snosan_json_unique u, jsonb_each(u.data->'messages'->0) AS obj(key, value)
17+
where key like 'paragraphe%'
18+
) t
19+
join snosan_json_operation_id sjoi on sjoi.chrono = t.chrono
20+
left join (
21+
select '1' src, 'A' dst
22+
union select '2' src, 'B' dst
23+
union select '3' src, 'C' dst
24+
union select '4' src, 'D' dst
25+
union select '5' src, 'E' dst
26+
union select '6' src, 'F' dst
27+
union select '7' src, 'G' dst
28+
union select '8' src, 'H' dst
29+
union select '9' src, 'I' dst
30+
union select '10' src, 'J' dst
31+
union select '11' src, 'K' dst
32+
union select '12' src, 'L' dst
33+
union select '13' src, 'M' dst
34+
union select '14' src, 'N' dst
35+
) mapping on mapping.src = key
36+
group by 1;

0 commit comments

Comments
 (0)