Skip to content

Commit

Permalink
ajout de models opendata
Browse files Browse the repository at this point in the history
  • Loading branch information
kolok committed Feb 18, 2025
1 parent ba1817a commit f08a29c
Show file tree
Hide file tree
Showing 15 changed files with 682 additions and 8 deletions.
4 changes: 2 additions & 2 deletions dags/.env.template
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK='true'
# WARNING=Use _PIP_ADDITIONAL_REQUIREMENTS option ONLY for a quick checks
# for other purpose (development, test and especially production usage) build/extend Airflow image.
_PIP_ADDITIONAL_REQUIREMENTS=${_PIP_ADDITIONAL_REQUIREMENTS:-}
AIRFLOW_CONN_QFDMO-DJANGO-DB='postgres://qfdmo:qfdmo@lvao-db:5432/qfdmo' # pragma: allowlist secret
AIRFLOW_CONN_QFDMO_DJANGO_DB='postgres://qfdmo:qfdmo@lvao-db:5432/qfdmo' # pragma: allowlist secret
DATABASE_URL=postgis://qfdmo:qfdmo@lvao-db:5432/qfdmo # pragma: allowlist secret

# DBT env vars
Expand All @@ -37,4 +37,4 @@ POSTGRES_PORT=5432
POSTGRES_USER=qfdmo
POSTGRES_PASSWORD=qfdmo
POSTGRES_DB=qfdmo
POSTGRES_SCHEMA=public
POSTGRES_SCHEMA=public
25 changes: 21 additions & 4 deletions dags/acteur_views/dags/build_vue.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,32 +27,49 @@
) as dag:

run_dbt_exhaustive_acteurs_model = BashOperator(
task_id="build_exhaustive_acteur",
task_id="build_exhaustive_acteurs",
bash_command=(
"cd /opt/airflow/dbt/ && dbt run --select qfdmo.exhaustive_acteurs"
),
dag=dag,
)
test_dbt_exhaustive_acteurs_model = BashOperator(
task_id="test_exhaustive_acteur",
task_id="test_exhaustive_acteurs",
bash_command=(
"cd /opt/airflow/dbt/ && dbt test --select qfdmo.exhaustive_acteurs"
),
dag=dag,
)
run_dbt_carte_acteurs_model = BashOperator(
task_id="build_exhaustive_acteur",
task_id="build_carte_acteurs",
bash_command=("cd /opt/airflow/dbt/ && dbt run --select qfdmo.carte_acteurs"),
dag=dag,
)
test_dbt_carte_acteurs_model = BashOperator(
task_id="test_exhaustive_acteur",
task_id="test_carte_acteurs",
bash_command=("cd /opt/airflow/dbt/ && dbt test --select qfdmo.carte_acteurs"),
dag=dag,
)
run_dbt_opendata_acteurs_model = BashOperator(
task_id="build_opendata_acteurs",
bash_command=(
"cd /opt/airflow/dbt/ && dbt run --select qfdmo.opendata_acteurs"
),
dag=dag,
)
test_dbt_opendata_acteurs_model = BashOperator(
task_id="test_opendata_acteurs",
bash_command=(
"cd /opt/airflow/dbt/ && dbt test --select qfdmo.opendata_acteurs"
),
dag=dag,
)

(
run_dbt_exhaustive_acteurs_model
>> test_dbt_exhaustive_acteurs_model
>> run_dbt_carte_acteurs_model
>> test_dbt_carte_acteurs_model
>> run_dbt_opendata_acteurs_model
>> test_dbt_opendata_acteurs_model
)
4 changes: 4 additions & 0 deletions dbt/models/opendata_acteurs/opendata_acteur.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
SELECT DISTINCT va.*
FROM {{ ref('temp_opendata_filteredacteur') }} AS va
INNER JOIN {{ ref('opendata_propositionservice') }} AS cps
ON va.identifiant_unique = cps.acteur_id
25 changes: 25 additions & 0 deletions dbt/models/opendata_acteurs/opendata_acteur_acteur_services.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
WITH nochild_acteur_acteur_services AS (
SELECT
aas.vueacteur_id AS acteur_id,
aas.acteurservice_id AS acteurservice_id
FROM qfdmo_vueacteur_acteur_services aas
INNER JOIN {{ ref('temp_opendata_filteredacteur') }} AS a ON aas.vueacteur_id = a.identifiant_unique AND a.parent_id is null
GROUP BY aas.vueacteur_id, aas.acteurservice_id
),
parentacteur_acteur_services AS (
SELECT
a.parent_id AS acteur_id,
aas.acteurservice_id AS acteurservice_id
FROM qfdmo_vueacteur_acteur_services aas
INNER JOIN {{ ref('temp_opendata_filteredacteur') }} AS a ON aas.vueacteur_id = a.identifiant_unique AND a.parent_id is not null
GROUP BY a.parent_id, aas.acteurservice_id
),
acteur_acteur_services AS (
SELECT * FROM nochild_acteur_acteur_services
UNION ALL
SELECT * FROM parentacteur_acteur_services
)

SELECT ROW_NUMBER() OVER (ORDER BY acteur_id, aas.acteurservice_id) AS id, aas.*
FROM acteur_acteur_services AS aas
INNER JOIN {{ ref('opendata_acteur') }} AS a ON a.identifiant_unique = acteur_id
100 changes: 100 additions & 0 deletions dbt/models/opendata_acteurs/opendata_acteur_formatted.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
WITH deduplicated_opened_sources AS (
SELECT
da.uuid,
string_agg(DISTINCT source.libelle, '|' ORDER BY source.libelle) as sources_list
FROM {{ ref('opendata_acteur') }} AS da
LEFT JOIN {{ ref('opendata_acteur_sources') }} AS das
ON da.identifiant_unique = das.acteur_id
LEFT JOIN qfdmo_source AS source
ON das.source_id = source.id
GROUP BY da.uuid
),
proposition_services AS (
SELECT
da.uuid,
jsonb_agg(
jsonb_build_object(
'action', a.code,
'sous_categories', (
SELECT jsonb_agg(sco.code)
FROM {{ ref('opendata_propositionservice_sous_categories') }} AS pssc
JOIN qfdmo_souscategorieobjet AS sco ON pssc.souscategorieobjet_id = sco.id
WHERE pssc.propositionservice_id = ps.id
)
)
) as services
FROM {{ ref('opendata_acteur') }} AS da
JOIN {{ ref('opendata_propositionservice') }} AS ps ON ps.acteur_id = da.identifiant_unique
JOIN qfdmo_action AS a ON ps.action_id = a.id
GROUP BY da.uuid
),
acteur_labels AS (
SELECT
da.uuid,
string_agg(DISTINCT lq.code, '|' ORDER BY lq.code) as labels
FROM {{ ref('opendata_acteur') }} AS da
LEFT JOIN {{ ref('opendata_acteur_labels') }} AS dal
ON da.identifiant_unique = dal.acteur_id
LEFT JOIN qfdmo_labelqualite AS lq ON dal.labelqualite_id = lq.id
GROUP BY da.uuid
),
acteur_services AS (
SELECT
da.uuid,
string_agg(DISTINCT as2.code, '|' ORDER BY as2.code) as services
FROM {{ ref('opendata_acteur') }} AS da
LEFT JOIN {{ ref('opendata_acteur_acteur_services') }} AS daas
ON da.identifiant_unique = daas.acteur_id
LEFT JOIN qfdmo_acteurservice AS as2 ON daas.acteurservice_id = as2.id
GROUP BY da.uuid
)
SELECT
da.uuid as "Identifiant",
CASE
WHEN ds.sources_list IS NOT NULL
THEN 'Longue Vie Aux Objets|ADEME|' || ds.sources_list
ELSE 'Longue Vie Aux Objets|ADEME'
END as "Paternité",
da.nom as "Nom",
da.nom_commercial as "Nom commercial",
da.siren as "SIREN",
da.siret as "SIRET",
da.description as "Description",
at.code as "Type d'acteur",
da.url as "Site web",
CASE
WHEN da.telephone ~ '^0[67]' THEN NULL
WHEN EXISTS (
SELECT 1
FROM {{ ref('opendata_acteur_sources') }} das2
JOIN qfdmo_source s ON das2.source_id = s.id
WHERE das2.acteur_id = da.identifiant_unique
AND s.code = 'carteco'
) THEN NULL
ELSE da.telephone
END as "Téléphone",
da.adresse as "Adresse",
da.adresse_complement as "Complément d'adresse",
da.code_postal as "Code postal",
da.ville as "Ville",
ST_Y(da.location::geometry) as "latitude",
ST_X(da.location::geometry) as "longitude",
al.labels as "Qualités et labels",
da.public_accueilli as "Public accueilli",
da.reprise as "Reprise",
da.exclusivite_de_reprisereparation as "Exclusivité de reprise/réparation",
da.uniquement_sur_rdv as "Uniquement sur RDV",
acs.services as "Type de services",
ps.services::text as "Propositions de services",
to_char(da.modifie_le, 'YYYY-MM-DD') as "Date de dernière modification"
FROM {{ ref('opendata_acteur') }} AS da
LEFT JOIN qfdmo_acteurtype AS at ON da.acteur_type_id = at.id
-- INNER JOIN : Only open lisense
INNER JOIN deduplicated_opened_sources AS ds ON da.uuid = ds.uuid
LEFT JOIN proposition_services AS ps ON da.uuid = ps.uuid
LEFT JOIN acteur_labels AS al ON da.uuid = al.uuid
LEFT JOIN acteur_services AS acs ON da.uuid = acs.uuid
WHERE da.statut = 'ACTIF'
AND da.public_accueilli NOT IN ('AUCUN', 'PROFESSIONNELS')
AND da.identifiant_unique NOT LIKE '%_reparation_%'
ORDER BY da.uuid
25 changes: 25 additions & 0 deletions dbt/models/opendata_acteurs/opendata_acteur_labels.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
WITH nochild_acteur_labels AS (
SELECT
al.vueacteur_id AS acteur_id,
al.labelqualite_id AS labelqualite_id
FROM qfdmo_vueacteur_labels al
INNER JOIN {{ ref('temp_opendata_filteredacteur') }} AS a ON al.vueacteur_id = a.identifiant_unique AND a.parent_id is null
GROUP BY al.vueacteur_id, al.labelqualite_id
),
parentacteur_labels AS (
SELECT
a.parent_id AS acteur_id,
al.labelqualite_id AS labelqualite_id
FROM qfdmo_vueacteur_labels al
INNER JOIN {{ ref('temp_opendata_filteredacteur') }} AS a ON al.vueacteur_id = a.identifiant_unique AND a.parent_id is not null
GROUP BY a.parent_id, al.labelqualite_id
),
acteur_labels AS (
SELECT * FROM nochild_acteur_labels
UNION ALL
SELECT * FROM parentacteur_labels
)

SELECT ROW_NUMBER() OVER (ORDER BY acteur_id, al.labelqualite_id) AS id, al.*
FROM acteur_labels AS al
INNER JOIN {{ ref('opendata_acteur') }} AS a ON a.identifiant_unique = acteur_id
25 changes: 25 additions & 0 deletions dbt/models/opendata_acteurs/opendata_acteur_sources.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
WITH nochild_acteur_labels AS (
SELECT
a.identifiant_unique AS acteur_id,
a.source_id AS source_id
FROM {{ ref('temp_opendata_filteredacteur') }} AS a
WHERE a.parent_id is null AND a.source_id is not null
GROUP BY a.identifiant_unique, a.source_id
),
parentacteur_labels AS (
SELECT
a.parent_id AS acteur_id,
a.source_id AS source_id
FROM {{ ref('temp_opendata_filteredacteur') }} AS a
WHERE a.parent_id is not null
GROUP BY a.parent_id, a.source_id
),
acteur_sources AS (
SELECT * FROM nochild_acteur_labels
UNION ALL
SELECT * FROM parentacteur_labels
)

SELECT ROW_NUMBER() OVER (ORDER BY acteur_id, s.source_id) AS id, s.*
FROM acteur_sources AS s
INNER JOIN {{ ref('opendata_acteur') }} AS a ON a.identifiant_unique = acteur_id
8 changes: 8 additions & 0 deletions dbt/models/opendata_acteurs/opendata_propositionservice.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
SELECT
MIN(ps.id) AS id,
ps.acteur_id,
ps.action_id
FROM {{ ref('temp_opendata_propositionservice') }} AS ps
INNER JOIN {{ ref('opendata_propositionservice_sous_categories') }} AS pssscat
ON ps.id = pssscat.propositionservice_id
GROUP BY acteur_id, action_id
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
with
parent_vuepropositionservice_sous_categories
AS
(
SELECT
MIN(qfdmo_vuepropositionservice_sous_categories.id) AS id,
CONCAT(temp_opendata_parentpropositionservice.parent_id::text, '_', temp_opendata_parentpropositionservice.action_id::text) AS propositionservice_id,
qfdmo_vuepropositionservice_sous_categories.souscategorieobjet_id AS souscategorieobjet_id
FROM qfdmo_vuepropositionservice_sous_categories
INNER JOIN {{ ref('temp_opendata_parentpropositionservice') }} AS temp_opendata_parentpropositionservice
ON temp_opendata_parentpropositionservice.id = qfdmo_vuepropositionservice_sous_categories.vuepropositionservice_id
GROUP BY
propositionservice_id,
souscategorieobjet_id
),
nochild_vuepropositionservice_sous_categories
AS
(
SELECT
qfdmo_vuepropositionservice_sous_categories.id AS id,
qfdmo_vuepropositionservice_sous_categories.vuepropositionservice_id AS propositionservice_id,
qfdmo_vuepropositionservice_sous_categories.souscategorieobjet_id AS souscategorieobjet_id
FROM qfdmo_vuepropositionservice_sous_categories
INNER JOIN {{ ref('temp_opendata_propositionservice') }} AS ps ON qfdmo_vuepropositionservice_sous_categories.vuepropositionservice_id = ps.id
INNER JOIN {{ ref('temp_opendata_filteredacteur') }} AS cfa ON ps.acteur_id = cfa.identifiant_unique AND cfa.parent_id is null
)

SELECT *
FROM parent_vuepropositionservice_sous_categories
UNION ALL
SELECT *
FROM nochild_vuepropositionservice_sous_categories
Loading

0 comments on commit f08a29c

Please sign in to comment.