diff --git a/.circleci/config.yml b/.circleci/config.yml index 384733218a..32cf21b39a 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -271,7 +271,7 @@ jobs: name: run bmo specific tests command: | [[ -f build_info/only_version_changed.txt ]] && exit 0 - docker-compose -f docker-compose.test.yml run --build bmo.test test_bmo -q -f t/bmo/*.t + docker-compose -f docker-compose.test.yml run --build bmo.test test_bmo -q -f t/bmo/*.t extensions/*/t/bmo/*.t - *store_log workflows: diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1c67b0f7ba..9cdfa8b220 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -35,7 +35,7 @@ jobs: - name: Build Docker test images run: docker-compose -f docker-compose.test.yml build - name: Run bmo specific tests - run: docker-compose -f docker-compose.test.yml run -e CI=1 bmo.test test_bmo -q -f t/bmo/*.t + run: docker-compose -f docker-compose.test.yml run -e CI=1 bmo.test test_bmo -q -f t/bmo/*.t extensions/*/t/bmo/*.t test_selenium_1: runs-on: ubuntu-latest diff --git a/conf/checksetup_answers.txt b/conf/checksetup_answers.txt index 4b8e830f76..251d0f3fb2 100644 --- a/conf/checksetup_answers.txt +++ b/conf/checksetup_answers.txt @@ -60,6 +60,12 @@ $answer{'sitemapindex_google_host'} = 'gcs'; $answer{'sitemapindex_google_bucket'} = 'sitemapindex'; $answer{'sitemapindex_google_service_account'} = 'test'; +$answer{'bmo_etl_enabled'} = 1; +$answer{'bmo_etl_base_url'} = 'http://bq:9050'; +$answer{'bmo_etl_service_account'} = 'test'; +$answer{'bmo_etl_project_id'} = 'test'; +$answer{'bmo_etl_dataset_id'} = 'bugzilla'; + $answer{'duo_uri'} = 'http://localhost:8001'; $answer{'duo_client_id'} = '6rZ3KnrL04uyGjLd8foO'; $answer{'duo_client_secret'} = '3vg6cm0Gj0DpC6ZJACXdZ1NrVRi1AhkwjfXnlFaJ'; diff --git a/docker-compose.test.yml b/docker-compose.test.yml index 152f86393e..563fc601c1 100644 --- a/docker-compose.test.yml +++ b/docker-compose.test.yml @@ -37,6 +37,7 @@ services: - memcached - s3 - gcs + - bq externalapi.test: build: *build_bmo @@ -69,3 +70,13 @@ services: - ./docker/gcs/attachments:/data/attachments - ./docker/gcs/sitemapindex:/data/sitemapindex - ./docker/gcs/mining:/data/mining + + bq: + build: + context: ./docker/bigquery + dockerfile: Dockerfile + ports: + - 9050:9050 + working_dir: /work + command: | + --project=test --data-from-yaml=/data.yaml diff --git a/docker-compose.yml b/docker-compose.yml index c238bb07a5..2ee79aab43 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -43,6 +43,7 @@ services: - s3 - gcs - externalapi.test + - bq ports: - 8000:8000 @@ -143,6 +144,19 @@ services: ports: - 8001:8001 + bq: + platform: linux/x86_64 + build: + context: ./docker/bigquery + dockerfile: Dockerfile + ports: + - 9050:9050 + volumes: + - bmo-bq-data:/work + working_dir: /work + command: | + --project=test --data-from-yaml=/data.yaml --log-level=debug + volumes: bmo-mysql-db: bmo-data-dir: @@ -150,3 +164,4 @@ volumes: bmo-gcs-attachments: bmo-gcs-sitemapindex: bmo-gcs-mining: + bmo-bq-data: diff --git a/docker/bigquery/Dockerfile b/docker/bigquery/Dockerfile new file mode 100644 index 0000000000..7cb792483f --- /dev/null +++ b/docker/bigquery/Dockerfile @@ -0,0 +1,3 @@ +FROM ghcr.io/goccy/bigquery-emulator:0.6.5 + +COPY data.yaml /data.yaml diff --git a/docker/bigquery/data.yaml b/docker/bigquery/data.yaml new file mode 100644 index 0000000000..13b93aa469 --- /dev/null +++ b/docker/bigquery/data.yaml @@ -0,0 +1,185 @@ +projects: + - id: test + datasets: + - id: bugzilla + tables: + - id: bugs + columns: + - name: id + type: INTEGER + - name: assignee_id + type: INTEGER + - name: url + type: STRING + - name: severity + type: STRING + - name: status + type: STRING + - name: type + type: STRING + - name: crash_signature + type: STRING + - name: component + type: STRING + - name: creation_ts + type: TIMESTAMP + - name: updated_ts + type: TIMESTAMP + - name: op_sys + type: STRING + - name: priority + type: STRING + - name: product + type: STRING + - name: platform + type: STRING + - name: reporter_id + type: INTEGER + - name: resolution + type: STRING + - name: summary + type: STRING + - name: whiteboard + type: STRING + - name: milestone + type: STRING + - name: version + type: STRING + - name: team_name + type: STRING + - name: group + type: STRING + - name: classification + type: STRING + - name: is_public + type: BOOLEAN + - name: comment_count + type: INTEGER + - name: cc_count + type: INTEGER + - name: vote_count + type: INTEGER + - name: snapshot_date + type: DATE + - id: attachments + columns: + - name: id + type: INT64 + - name: bug_id + type: INT64 + - name: creation_ts + type: TIMESTAMP + - name: description + type: STRING + - name: filename + type: STRING + - name: is_obsolete + type: BOOL + - name: content_type + type: STRING + - name: updated_ts + type: TIMESTAMP + - name: submitter_id + type: INT64 + - name: snapshot_date + type: DATE + - id: flags + columns: + - name: attachment_id + type: INT64 + - name: bug_id + type: INT64 + - name: creation_ts + type: TIMESTAMP + - name: updated_ts + type: TIMESTAMP + - name: requestee_id + type: INT64 + - name: setter_id + type: INT64 + - name: name + type: STRING + - name: value + type: STRING + - name: snapshot_date + type: DATE + - id: tracking_flags + columns: + - name: bug_id + type: INT64 + - name: name + type: STRING + - name: value + type: STRING + - name: snapshot_date + type: DATE + - id: keywords + columns: + - name: bug_id + type: INT64 + - name: keyword + type: STRING + - name: snapshot_date + type: DATE + - id: see_also + columns: + - name: bug_id + type: INT64 + - name: url + type: STRING + - name: snapshot_date + type: DATE + - id: bug_mentors + columns: + - name: bug_id + type: INT64 + - name: user_id + type: INT64 + - name: snapshot_date + type: DATE + - id: bug_dependencies + columns: + - name: bug_id + type: INT64 + - name: depends_on_id + type: INT64 + - name: snapshot_date + type: DATE + - id: bug_regressions + columns: + - name: bug_id + type: INT64 + - name: regresses_id + type: INT64 + - name: snapshot_date + type: DATE + - id: bug_duplicates + columns: + - name: bug_id + type: INT64 + - name: duplicate_of_id + type: INT64 + - name: snapshot_date + type: DATE + - id: users + columns: + - name: id + type: INT64 + - name: last_seen + type: TIMESTAMP + - name: email + type: STRING + - name: nick + type: STRING + - name: name + type: STRING + - name: is_staff + type: BOOL + - name: is_trusted + type: BOOL + - name: ldap_email + type: STRING + - name: is_new + type: BOOL + - name: snapshot_date + type: DATE diff --git a/extensions/BMO/Extension.pm b/extensions/BMO/Extension.pm index b6bbcf7af5..9b3f7a2f68 100755 --- a/extensions/BMO/Extension.pm +++ b/extensions/BMO/Extension.pm @@ -1384,6 +1384,21 @@ sub db_schema_abstract_schema { ], INDEXES => [job_last_run_name_idx => {FIELDS => ['name'], TYPE => 'UNIQUE',},], }; + $args->{schema}->{bmo_etl_cache} = { + FIELDS => [ + id => {TYPE => 'INT3', NOTNULL => 1,}, + snapshot_date => {TYPE => 'DATETIME', NOTNULL => 1,}, + table_name => {TYPE => 'VARCHAR(100)', NOTNULL => 1,}, + data => {TYPE => 'LONGBLOB', NOTNULL => 1,}, + ], + INDEXES => + [bmo_etl_cache_idx => {FIELDS => ['id', 'snapshot_date', 'table_name']}], + }; + $args->{schema}->{bmo_etl_locked} = { + FIELDS => [ + value => {TYPE => 'VARCHAR(20)', NOTNULL => 1,}, + ], + }; } sub install_update_db { @@ -2588,6 +2603,33 @@ sub config_modify_panels { name => 'enable_triaged_keyword', type => 'b', }; + push @{$args->{panels}->{reports}->{params}}, + { + name => 'bmo_etl_enabled', + type => 'b', + default => 0, + }; + push @{$args->{panels}->{reports}->{params}}, + { + name => 'bmo_etl_base_url', + type => 't', + }; + push @{$args->{panels}->{reports}->{params}}, + { + name => 'bmo_etl_service_account', + type => 't', + }; + push @{$args->{panels}->{reports}->{params}}, + { + name => 'bmo_etl_project_id', + type => 't', + }; + push @{$args->{panels}->{reports}->{params}}, + { + name => 'bmo_etl_dataset_id', + type => 't', + }; + } sub comment_after_add_tag { diff --git a/extensions/BMO/bin/export_bmo_etl.pl b/extensions/BMO/bin/export_bmo_etl.pl new file mode 100644 index 0000000000..9d41ef3219 --- /dev/null +++ b/extensions/BMO/bin/export_bmo_etl.pl @@ -0,0 +1,908 @@ +#!/usr/bin/env perl +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. +# +# This Source Code Form is "Incompatible With Secondary Licenses", as +# defined by the Mozilla Public License, v. 2.0. + +use 5.10.1; +use strict; +use warnings; +use lib qw(. lib local/lib/perl5); + +use Bugzilla; +use Bugzilla::Attachment; +use Bugzilla::Bug; +use Bugzilla::Constants; +use Bugzilla::Flag; +use Bugzilla::Group; +use Bugzilla::User; +use Bugzilla::Extension::Review::FlagStateActivity; + +use HTTP::Headers; +use HTTP::Request; +use IO::Compress::Gzip qw(gzip $GzipError); +use IO::Uncompress::Gunzip qw(gunzip $GunzipError); +use List::Util qw(any); +use LWP::UserAgent::Determined; +use Mojo::File qw(path); +use Mojo::JSON qw(decode_json encode_json false true); +use Mojo::Util qw(getopt); + +# BigQuery API cannot handle payloads larger than 10MB so +# we will send data in blocks. +use constant API_BLOCK_COUNT => 1000; + +# Products which we should not send data to ETL such as Legal, etc. +use constant EXCLUDE_PRODUCTS => ('Legal',); + +Bugzilla->usage_mode(USAGE_MODE_CMDLINE); +getopt + 't|test' => \my $test, + 'v|verbose' => \my $verbose, + 's|snapshot-date=s' => \my $snapshot_date; + +# Sanity checks +Bugzilla->params->{bmo_etl_enabled} || die "BMO ETL not enabled.\n"; + +my $base_url = Bugzilla->params->{bmo_etl_base_url}; +$base_url || die "Invalid BigQuery base URL.\n"; + +my $project_id = Bugzilla->params->{bmo_etl_project_id}; +$project_id || die "Invalid BigQuery product ID.\n"; + +my $dataset_id = Bugzilla->params->{bmo_etl_dataset_id}; +$dataset_id || die "Invalid BigQuery dataset ID.\n"; + +# Check to make sure another instance is not currently running +check_and_set_lock(); + +# Use replica if available +my $dbh = Bugzilla->switch_to_shadow_db(); + +my $ua = LWP::UserAgent::Determined->new( + agent => 'Bugzilla', + keep_alive => 10, + requests_redirectable => [qw(GET HEAD DELETE PUT)], +); +$ua->timing('1,2,4,8,16,32'); +$ua->timeout(30); +if (my $proxy = Bugzilla->params->{proxy_url}) { + $ua->proxy(['https', 'http'], $proxy); +} + +# This date will be added to each object as it is being sent +if (!$snapshot_date) { + $snapshot_date = $dbh->selectrow_array( + 'SELECT ' . $dbh->sql_date_format('LOCALTIMESTAMP(0)', '%Y-%m-%d')); +} + +# Excluded bugs: List of bug ids that we should not send data for to ETL (i.e. Legal, etc.) +our %excluded_bugs = (); + +# Bugs that are private to one or more groups +our %private_bugs = (); + +# In order to avoid entering duplicate data, we will first query BigQuery +# to make sure other entries with this date are not already present. +check_for_duplicates(); + +# Process each table to be sent to ETL +process_bugs(); +process_attachments(); +process_flags(); +process_flag_state_activity(); +process_tracking_flags(); +process_keywords(); +process_see_also(); +process_users(); + +process_two_columns( + 'bug_mentors', 'bug_mentors', + ['bug_id', 'user_id'], + ['bug_id', 'user_id'] +); +process_two_columns( + 'dependencies', 'bug_dependencies', + ['blocked', 'dependson'], + ['bug_id', 'depends_on_id'] +); +process_two_columns( + 'regressions', 'bug_regressions', + ['regresses', 'regressed_by'], + ['bug_id', 'regresses_id'] +); +process_two_columns( + 'duplicates', 'bug_duplicates', + ['dupe', 'dupe_of'], + ['bug_id', 'duplicate_of_id'] +); + +# If we are done, remove the lock +delete_lock(); + +### Functions + +sub process_bugs { + my $table_name = 'bugs'; + my $count = 0; + my $last_offset = 0; + + my $total = $dbh->selectrow_array('SELECT COUNT(*) FROM bugs'); + print "Processing $total $table_name.\n" if $verbose; + + my $sth + = $dbh->prepare( + 'SELECT bug_id AS id, delta_ts AS modification_time FROM bugs ORDER BY bug_id LIMIT ? OFFSET ?' + ); + + while ($count < $total) { + my @bugs = (); + + $sth->execute(API_BLOCK_COUNT, $last_offset); + + while (my ($id, $mod_time) = $sth->fetchrow_array()) { + print "Processing id $id with mod_time of $mod_time.\n" if $verbose; + + # First check to see if we have a cached version with the same modification date + my $data = get_cache($id, $table_name, $mod_time); + if (!$data) { + print "$table_name id $id with time $mod_time not found in cache.\n" + if $verbose; + + my $obj = Bugzilla::Bug->new($id); + + my $bug_is_private = scalar @{$obj->groups_in}; + + if (any { $obj->product eq $_ } EXCLUDE_PRODUCTS) { + $excluded_bugs{$obj->id} = 1; + next; + } + + $private_bugs{$obj->id} = 1 if $bug_is_private; + + # Standard non-sensitive fields + $data = { + id => $obj->id, + status => $obj->bug_status, + type => $obj->bug_type, + component => $obj->component, + creation_ts => $obj->creation_ts, + updated_ts => $obj->delta_ts, + op_sys => $obj->op_sys, + product => $obj->product, + platform => $obj->rep_platform, + reporter_id => $obj->reporter->id, + version => $obj->version, + team_name => $obj->component_obj->team_name, + classification => $obj->classification, + comment_count => $obj->comment_count, + vote_count => $obj->votes, + }; + + # Fields that require custom values based on criteria + $data->{assignee_id} + = $obj->assigned_to->login ne 'nobody@mozilla.org' + ? $obj->assigned_to->id + : undef; + $data->{url} + = (!$bug_is_private && $obj->bug_file_loc) ? $obj->bug_file_loc : undef; + $data->{severity} = $obj->bug_severity ne '--' ? $obj->bug_severity : undef; + $data->{crash_signature} + = (!$bug_is_private && $obj->cf_crash_signature) + ? $obj->cf_crash_signature + : undef; + $data->{priority} = $obj->priority ne '--' ? $obj->priority : undef; + $data->{resolution} = $obj->resolution ? $obj->resolution : undef; + $data->{summary} = !$bug_is_private ? $obj->short_desc : undef; + $data->{whiteboard} + = (!$bug_is_private && $obj->status_whiteboard) + ? $obj->status_whiteboard + : undef; + $data->{milestone} + = $obj->target_milestone ne '---' ? $obj->target_milestone : undef; + $data->{is_public} = $bug_is_private ? true : false; + $data->{cc_count} = scalar @{$obj->cc || []}; + + # If more than one group, then pick the one with the least of amount of members + if (!$bug_is_private) { + $data->{group} = undef; + } + elsif (scalar @{$obj->groups_in} == 1) { + my $groups = $obj->groups_in; + $data->{group} = $groups->[0]->name; + } + else { + $data->{group} = get_multi_group_value($obj); + } + + # Store a copy of the data for use in later executions + store_cache($obj->id, $table_name, $obj->delta_ts, $data); + } + + push @bugs, $data; + + $count++; + } + + $last_offset += API_BLOCK_COUNT; + + # Send the rows to the server + send_data($table_name, \@bugs, $count) if @bugs; + } +} + +sub process_attachments { + my $table_name = 'attachments'; + my $count = 0; + my $last_offset = 0; + + my $total = $dbh->selectrow_array('SELECT COUNT(*) FROM attachments'); + print "Processing $total $table_name.\n" if $verbose; + + my $sth + = $dbh->prepare( + 'SELECT attach_id, modification_time FROM attachments ORDER BY attach_id LIMIT ? OFFSET ?' + ); + + while ($count < $total) { + my @results = (); + + $sth->execute(API_BLOCK_COUNT, $last_offset); + + while (my ($id, $mod_time) = $sth->fetchrow_array()) { + print "Processing id $id with mod_time of $mod_time.\n" if $verbose; + + # First check to see if we have a cached version with the same modification date + my $data = get_cache($id, $table_name, $mod_time); + if (!$data) { + print "$table_name id $id with time $mod_time not found in cache.\n" + if $verbose; + + my $obj = Bugzilla::Attachment->new($id); + + next if $excluded_bugs{$obj->bug_id}; + + # Standard non-sensitive fields + $data = { + id => $obj->id, + bug_id => $obj->bug_id, + creation_ts => $obj->attached, + content_type => $obj->contenttype, + updated_ts => $obj->modification_time, + submitter_id => $obj->attacher->id, + is_obsolete => ($obj->isobsolete ? true : false), + }; + + # Fields that require custom values based on criteria + my $bug_is_private = exists $private_bugs{$obj->bug_id}; + $data->{description} = !$bug_is_private ? $obj->description : undef; + $data->{filename} = !$bug_is_private ? $obj->filename : undef; + + # Store a new copy of the data for use later + store_cache($obj->id, $table_name, $obj->modification_time, $data); + } + + push @results, $data; + + $count++; + } + + $last_offset += API_BLOCK_COUNT; + + # Send the rows to the server + send_data($table_name, \@results, $count) if @results; + } +} + +sub process_flags { + my $table_name = 'flags'; + my $count = 0; + my $last_offset = 0; + + my $total = $dbh->selectrow_array('SELECT COUNT(*) FROM flags'); + print "Processing $total $table_name.\n" if $verbose; + + my $sth = $dbh->prepare( + 'SELECT id, modification_date FROM flags ORDER BY id LIMIT ? OFFSET ?'); + + while ($count < $total) { + my @results = (); + + $sth->execute(API_BLOCK_COUNT, $last_offset); + + while (my ($id, $mod_time) = $sth->fetchrow_array()) { + print "Processing id $id with mod_time of $mod_time.\n" if $verbose; + + # First check to see if we have a cached version with the same modification date + my $data = get_cache($id, $table_name, $mod_time); + if (!$data) { + print "$table_name id $id with time $mod_time not found in cache.\n" + if $verbose; + + my $obj = Bugzilla::Flag->new($id); + + next if $excluded_bugs{$obj->bug_id}; + + $data = { + attachment_id => $obj->attach_id || undef, + bug_id => $obj->bug_id, + creation_ts => $obj->creation_date, + updated_ts => $obj->modification_date, + requestee_id => $obj->requestee_id, + setter_id => $obj->setter_id, + name => $obj->type->name, + value => $obj->status, + }; + + # Store a new copy of the data for use later + store_cache($obj->id, $table_name, $obj->modification_date, $data); + } + + push @results, $data; + + $count++; + } + + $last_offset += API_BLOCK_COUNT; + + # Send the rows to the server + send_data($table_name, \@results, $count) if @results; + } +} + +sub process_flag_state_activity { + + # Process flags that were removed today using the flag_state_activity table + # These entries will also go into the flags table in BigQuery. + my $table_name = 'flag_state_activity'; + my $count = 0; + my $last_offset = 0; + + my $total + = $dbh->selectrow_array( + 'SELECT COUNT(*) FROM flag_state_activity WHERE status = \'X\' AND flag_when LIKE \'' + . $snapshot_date + . ' %\''); + print "Processing $total $table_name.\n" if $verbose; + + my $sth + = $dbh->prepare( + 'SELECT id, flag_when FROM flag_state_activity WHERE status = \'X\' AND flag_when LIKE \'' + . $snapshot_date + . ' %\' ORDER BY id LIMIT ? OFFSET ?'); + + while ($count < $total) { + my @results = (); + + $sth->execute(API_BLOCK_COUNT, $last_offset); + + while (my ($id, $mod_time) = $sth->fetchrow_array()) { + print "Processing id $id with mod_time of $mod_time.\n" if $verbose; + + # First check to see if we have a cached version with the same modification date + my $data = get_cache($id, $table_name, $mod_time); + if (!$data) { + print "$table_name id $id with time $mod_time not found in cache.\n" + if $verbose; + + my $obj = Bugzilla::Extension::Review::FlagStateActivity->new($id); + + next if $excluded_bugs{$obj->bug_id}; + + $data = { + attachment_id => $obj->attachment_id || undef, + bug_id => $obj->bug_id, + creation_ts => $obj->flag_when, + updated_ts => $obj->flag_when, + requestee_id => $obj->requestee_id, + setter_id => $obj->setter_id, + name => $obj->type->name, + value => $obj->status, + }; + + # Store a new copy of the data for use later + store_cache($obj->id, $table_name, $obj->flag_when, $data); + } + + push @results, $data; + + $count++; + } + + $last_offset += API_BLOCK_COUNT; + + # Send the rows to the server + send_data('flags', \@results, $count) if @results; + } +} + +sub process_tracking_flags { + my $table_name = 'tracking_flags'; + my $count = 0; + my $last_offset = 0; + + my $total = $dbh->selectrow_array( + 'SELECT COUNT(*) + FROM tracking_flags_bugs + JOIN tracking_flags + ON tracking_flags_bugs.tracking_flag_id = tracking_flags.id + ORDER BY tracking_flags_bugs.bug_id' + ); + print "Processing $total $table_name.\n" if $verbose; + + my $sth = $dbh->prepare( + 'SELECT tracking_flags.name, tracking_flags_bugs.bug_id, tracking_flags_bugs.value + FROM tracking_flags_bugs + JOIN tracking_flags + ON tracking_flags_bugs.tracking_flag_id = tracking_flags.id + ORDER BY tracking_flags_bugs.id LIMIT ? OFFSET ?' + ); + + while ($count < $total) { + my @results = (); + + $sth->execute(API_BLOCK_COUNT, $last_offset); + + while (my ($name, $bug_id, $value) = $sth->fetchrow_array()) { + next if $excluded_bugs{$bug_id}; + + # Standard fields + my $data = {bug_id => $bug_id}; + + # Fields that require custom values based on other criteria + if (exists $private_bugs{$bug_id}) { + $data->{name} = undef; + $data->{value} = undef; + } + else { + $data->{name} = $name; + $data->{value} = $value; + } + + push @results, $data; + + $count++; + } + + $last_offset += API_BLOCK_COUNT; + + # Send the rows to the server + send_data($table_name, \@results, $count) if @results; + } +} + +sub process_keywords { + my $table_name = 'keywords'; + my $count = 0; + my $last_offset = 0; + + my $total = $dbh->selectrow_array('SELECT COUNT(*) FROM keywords'); + print "Processing $total $table_name.\n" if $verbose; + + my $sth = $dbh->prepare( + 'SELECT bug_id, keyworddefs.name + FROM keywords + JOIN keyworddefs + ON keywords.keywordid = keyworddefs.id + ORDER BY bug_id LIMIT ? OFFSET ?' + ); + + while ($count < $total) { + my @results = (); + + $sth->execute(API_BLOCK_COUNT, $last_offset); + + while (my ($bug_id, $keyword) = $sth->fetchrow_array()) { + next if $excluded_bugs{$bug_id}; + + # Standard fields + my $data = {bug_id => $bug_id}; + + # Fields that require custom values based on other criteria + $data->{keyword} = !exists $private_bugs{$bug_id} ? $keyword : undef; + + push @results, $data; + + $count++; + } + + $last_offset += API_BLOCK_COUNT; + + # Send the rows to the server + send_data($table_name, \@results, $count) if @results; + } +} + +sub process_see_also { + my $table_name = 'see_also'; + my $count = 0; + my $last_offset = 0; + + my $total = $dbh->selectrow_array('SELECT COUNT(*) FROM bug_see_also'); + print "Processing $total $table_name.\n" if $verbose; + + my $sth + = $dbh->prepare( + 'SELECT bug_id, value, class FROM bug_see_also ORDER BY bug_id LIMIT ? OFFSET ?' + ); + + while ($count < $total) { + my @results = (); + + $sth->execute(API_BLOCK_COUNT, $last_offset); + + while (my ($bug_id, $value, $class) = $sth->fetchrow_array()) { + next if $excluded_bugs{$bug_id}; + + # Standard fields + my $data = {bug_id => $bug_id,}; + + # Fields that require custom values based on other criteria + if ($private_bugs{$bug_id}) { + $data->{url} = undef; + } + elsif ($class =~ /::Local/) { + $data->{url} = Bugzilla->localconfig->urlbase . 'show_bug.cgi?id=' . $value; + } + else { + $data->{url} = $value; + } + + push @results, $data; + + $count++; + } + + $last_offset += API_BLOCK_COUNT; + + # Send the rows to the server + send_data($table_name, \@results, $count) if @results; + } +} + +sub process_users { + my $table_name = 'users'; + my $count = 0; + my $last_offset = 0; + + my $total = $dbh->selectrow_array('SELECT COUNT(*) FROM profiles'); + print "Processing $total $table_name.\n" if $verbose; + + my $sth + = $dbh->prepare( + 'SELECT userid, modification_ts FROM profiles ORDER BY userid LIMIT ? OFFSET ?' + ); + + while ($count < $total) { + my @users = (); + + $sth->execute(API_BLOCK_COUNT, $last_offset); + + while (my ($id, $mod_time) = $sth->fetchrow_array()) { + print "Processing id $id with mod_time of $mod_time.\n" if $verbose; + + # First check to see if we have a cached version with the same modification date + my $data = get_cache($id, $table_name, $mod_time); + if (!$data) { + print "$table_name id $id with time $mod_time not found in cache.\n" + if $verbose; + + my $obj = Bugzilla::User->new($id); + + # Standard fields + $data = { + id => $obj->id, + last_seen => ($obj->last_seen_date ? $obj->last_seen_date . ' 00:00:00' : undef), + email => $obj->email, + is_new => ($obj->is_new ? true : false), + }; + + # Fields that require custom values based on criteria + $data->{nick} = $obj->nick ? $obj->nick : undef; + $data->{name} = $obj->name ? $obj->name : undef; + $data->{is_staff} + = $obj->in_group('mozilla-employee-confidential') ? true : false; + $data->{is_trusted} = $obj->in_group('editbugs') ? true : false; + $data->{ldap_email} = $obj->ldap_email ? $obj->ldap_email : undef; + + # Store a new copy of the data for use later + store_cache($obj->id, $table_name, $obj->modification_ts, $data); + } + + push @users, $data; + + $count++; + } + + $last_offset += API_BLOCK_COUNT; + + # Send the rows to the server + send_data($table_name, \@users, $count) if @users; + } +} + +sub process_two_columns { + my ($table_name, $bq_name, $column_names, $data_names) = @_; + my $count = 0; + my $last_offset = 0; + + my $total = $dbh->selectrow_array('SELECT COUNT(*) FROM ' . $table_name); + print "Processing $total $table_name.\n" if $verbose; + + my $columns_string = join ', ', @{$column_names}; + my $order_by = $column_names->[0]; + + my $sth = $dbh->prepare( + "SELECT $columns_string FROM $table_name ORDER BY $order_by LIMIT ? OFFSET ?"); + + while ($count < $total) { + my @results = (); + + $sth->execute(API_BLOCK_COUNT, $last_offset); + + while (my ($value1, $value2) = $sth->fetchrow_array()) { + next if $excluded_bugs{$value1}; + + print "Processing values $value1, $value2 for $table_name.\n" if $verbose; + + my $data = {$data_names->[0] => $value1, $data_names->[1] => $value2,}; + + push @results, $data; + + $count++; + } + + $last_offset += API_BLOCK_COUNT; + + # Send the rows to the server + send_data($bq_name, \@results, $count) if @results; + } +} + +sub get_cache { + my ($id, $table, $timestamp) = @_; + + print "Retreiving data from $table for $id with time $timestamp.\n" if $verbose; + + # Retrieve compressed JSON from cache table if it exists + my $gzipped_data = $dbh->selectrow_array( + 'SELECT data FROM bmo_etl_cache WHERE id = ? AND table_name = ? AND snapshot_date = ?', + undef, $id, $table, $timestamp + ); + return undef if !$gzipped_data; + + # First uncompress the JSON and then decode it back to Perl data + my $data; + unless (gunzip \$gzipped_data => \$data) { + delete_lock(); + die "gunzip failed: $GunzipError\n"; + } + return decode_json($data); +} + +sub store_cache { + my ($id, $table, $timestamp, $data) = @_; + + print "Storing data into $table for $id with time $timestamp.\n" if $verbose; + + # Encode the perl data into JSON + $data = encode_json($data); + + # Compress the JSON to save space in the DB + my $gzipped_data; + unless (gzip \$data => \$gzipped_data) { + delete_lock(); + die "gzip failed: $GzipError\n"; + } + + # We need to use the main DB for write operations + my $main_dbh = Bugzilla->dbh_main; + + # Clean out outdated JSON + $main_dbh->do('DELETE FROM bmo_etl_cache WHERE id = ? AND table_name = ?', + undef, $id, $table); + + # Enter new cached JSON + $main_dbh->do( + 'INSERT INTO bmo_etl_cache (id, table_name, snapshot_date, data) VALUES (?, ?, ?, ?)', + undef, $id, $table, $timestamp, $gzipped_data + ); +} + +sub send_data { + my ($table, $all_rows, $current_count) = @_; + + print 'Sending ' + . scalar @{$all_rows} + . " rows to table $table using BigQuery API\n" + if $verbose; + + # Add the same snapshot date to every row sent + foreach my $row (@{$all_rows}) { + $row->{snapshot_date} = $snapshot_date; + } + + my @json_rows = (); + foreach my $row (@{$all_rows}) { + push @json_rows, {json => $row}; + } + + my $big_query = {rows => \@json_rows}; + + if ($test) { + my $filename + = bz_locations()->{'datadir'} . '/' + . $snapshot_date . '-' + . $table . '-' + . $current_count . '.json'; + + print "Writing data to $filename\n" if $verbose; + + my $fh = path($filename)->open('>>'); + print $fh encode_json($big_query) . "\n"; + unless (close $fh) { + delete_lock(); + die "Could not close $filename: $!\n"; + } + + return; + } + + my $http_headers = HTTP::Headers->new; + + # Do not attempt to get access token if running in test environment + if ($base_url !~ /^http:\/\/[^\/]+:9050/) { + my $access_token = _get_access_token(); + $http_headers->header(Authorization => 'Bearer ' . $access_token); + } + + my $full_path = sprintf 'projects/%s/datasets/%s/tables/%s/insertAll', + $project_id, $dataset_id, $table; + + print "Sending to $base_url/$full_path\n" if $verbose; + + my $request = HTTP::Request->new('POST', "$base_url/$full_path", $http_headers); + $request->header('Content-Type' => 'application/json'); + $request->content(encode_json($big_query)); + + my $response = $ua->request($request); + my $result = decode_json($response->content); + + if (!$response->is_success + || (exists $result->{insertErrors} && @{$result->{insertErrors}})) + { + delete_lock(); + die "Google Big Query insert failure:\nRequest:\n" + . $request->content + . "\n\nResponse:\n" + . $response->content . "\n"; + } +} + +sub _get_access_token { + state $access_token; # We should only need to get this once + state $token_expiry; + + # If we already have a token and it has not expired yet, just return it + if ($access_token && time < $token_expiry) { + return $access_token; + } + +# Google Kubernetes allows for the use of Workload Identity. This allows +# us to link two service accounts together and give special access for applications +# running under Kubernetes. We use the special access to get an OAuth2 access_token +# that can then be used for accessing the the Google API such as BigQuery. + my $url + = sprintf + 'http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/%s/token', + Bugzilla->params->{bmo_etl_service_account}; + + my $http_headers = HTTP::Headers->new; + $http_headers->header('Metadata-Flavor' => 'Google'); + + my $request = HTTP::Request->new('GET', $url, $http_headers); + + my $res = $ua->request($request); + + if (!$res->is_success) { + delete_lock(); + die 'Google access token failure: ' . $res->content . "\n"; + } + + my $result = decode_json($res->decoded_content); + $access_token = $result->{access_token}; + $token_expiry = time + $result->{expires_in}; + + return $access_token; +} + +# If a previous process is performing an export to BigQuery, then +# we must check the lock table and exit if true. +sub check_and_set_lock { + return if $test; # No need if just dumping test files + + my $dbh_main = Bugzilla->dbh_main; + my $locked = $dbh_main->selectrow_array('SELECT COUNT(*) FROM bmo_etl_locked'); + if ($locked) { + die "Another process has set a lock. Exiting\n"; + } + $dbh_main->do('INSERT INTO bmo_etl_locked VALUES (?)', undef, 'locked'); +} + +# Delete lock from bmo_etl_locked +sub delete_lock { + print "Deleting lock in database\n" if $verbose; + Bugzilla->dbh_main->do('DELETE FROM bmo_etl_locked'); +} + +sub check_for_duplicates { + return if $test; # no need if just dumping test files + + print "Checking for duplicate data for snapshot date $snapshot_date\n" + if $verbose; + + my $http_headers = HTTP::Headers->new; + + # Do not attempt to get access token if running in test environment + if ($base_url !~ /^http:\/\/[^\/]+:9050/) { + my $access_token = _get_access_token(); + $http_headers->header(Authorization => 'Bearer ' . $access_token); + } + + my $full_path = "projects/$project_id/queries"; + + print "Querying $base_url/$full_path\n" if $verbose; + + my $query = { + query => + "SELECT count(*) FROM ${project_id}.${dataset_id}.bugs WHERE snapshot_date = '$snapshot_date';", + useLegacySql => false, + }; + + my $request = HTTP::Request->new('POST', "$base_url/$full_path", $http_headers); + $request->header('Content-Type' => 'application/json'); + $request->content(encode_json($query)); + + print encode_json($query) . "\n" if $verbose; + + my $res = $ua->request($request); + if (!$res->is_success) { + delete_lock(); + die 'Google Big Query query failure: ' . $res->content . "\n"; + } + + my $result = decode_json($res->content); + + my $row_count = $result->{rows}->[0]->{f}->[0]->{v}; + + # Do not export if we have any rows with this snapshot date. + if ($row_count) { + delete_lock(); + die "Duplicate data found for snapshot date $snapshot_date\n"; + } +} + +sub get_multi_group_value { + my ($bug) = @_; + + my $smallest_group_name = undef; + my $smallest_group_count = 0; + + foreach my $group (@{$bug->groups_in}) { + my $user_count = 0; + my $member_data = $group->members_complete; + foreach my $type (keys %{$member_data}) { + $user_count += scalar @{$member_data->{$type}}; + } + if ($user_count < $smallest_group_count) { + $smallest_group_count = $user_count; + $smallest_group_name = $group->name; + } + } + + return $smallest_group_name; +} + +1; diff --git a/extensions/BMO/t/bmo/bmo_etl.t b/extensions/BMO/t/bmo/bmo_etl.t new file mode 100644 index 0000000000..e414b09cdf --- /dev/null +++ b/extensions/BMO/t/bmo/bmo_etl.t @@ -0,0 +1,212 @@ +#!/usr/bin/env perl +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. +# +# This Source Code Form is "Incompatible With Secondary Licenses", as +# defined by the Mozilla Public License, v. 2.0. + +use 5.10.1; +use strict; +use warnings; +use lib qw(. lib local/lib/perl5 qa/t/lib); + +use Bugzilla; +use Bugzilla::Constants; + +BEGIN { + Bugzilla->extensions; +} + +use Capture::Tiny qw(capture); +use DateTime; +use QA::Util qw(get_config); +use MIME::Base64 qw(encode_base64 decode_base64); +use Mojo::JSON qw(false); +use Test::Mojo; +use Test::More; + +my $config = get_config(); +my $admin_login = $config->{admin_user_login}; +my $admin_api_key = $config->{admin_user_api_key}; +my $editbugs_api_key = $config->{editbugs_user_api_key}; +my $url = Bugzilla->localconfig->urlbase; +my $snapshot_date = DateTime->now()->strftime('%Y-%m-%d'); + +my $t = Test::Mojo->new(); + +# Allow 1 redirect max +$t->ua->max_redirects(1); + +### Section 1: Create new bug + +my $new_bug_1 = { + product => 'Firefox', + component => 'General', + summary => 'This is a new test bug', + type => 'defect', + version => 'unspecified', + severity => 'blocker', + description => 'This is a new test bug', + flags => [{ + name => 'needinfo', + status => '?', + requestee => $config->{editbugs_user_login}, + }], +}; + +$t->post_ok($url + . 'rest/bug' => {'X-Bugzilla-API-Key' => $admin_api_key} => json => + $new_bug_1)->status_is(200)->json_has('/id'); + +my $bug_id_1 = $t->tx->res->json->{id}; + +# Clear the needinfo which will add an X entry in flag_state_activity +my $needinfo_update = { + comment => {body => 'This is my comment'}, + flags => [{name => 'needinfo', status => 'X'}], +}; +$t->put_ok($url + . "rest/bug/$bug_id_1" => {'X-Bugzilla-API-Key' => $editbugs_api_key} => + json => $needinfo_update)->status_is(200); + +### Section 2: Create a new dependent bug + +my $new_bug_2 = { + product => 'Firefox', + component => 'General', + summary => 'This is a new dependent bug', + type => 'defect', + version => 'unspecified', + severity => 'blocker', + description => 'This is a new dependent bug', + depends_on => [$bug_id_1], +}; + +$t->post_ok($url + . 'rest/bug' => {'X-Bugzilla-API-Key' => $admin_api_key} => json => + $new_bug_2)->status_is(200)->json_has('/id'); + +my $bug_id_2 = $t->tx->res->json->{id}; + +### Section 3: Create an attachment + +my $attach_data = 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'; + +my $new_attach_1 = { + is_patch => 1, + comment => 'This is a new attachment', + summary => 'Test Attachment', + content_type => 'text/plain', + data => encode_base64($attach_data), + file_name => 'test_attachment.patch', + obsoletes => [], + is_private => 0, + flags => [{ + name => 'review', + status => '?', + requestee => $config->{editbugs_user_login}, + }], +}; + +$t->post_ok($url + . "rest/bug/$bug_id_1/attachment" => + {'X-Bugzilla-API-Key' => $admin_api_key} => json => $new_attach_1) + ->status_is(201)->json_has('/attachments'); + +my ($attach_id) = keys %{$t->tx->res->json->{attachments}}; + +### Section 4: Export data to test files + +my @cmd = ( + './extensions/BMO/bin/export_bmo_etl.pl', + '--verbose', '--test', '--snapshot-date', $snapshot_date, +); + +my ($output, $error, $rv) = capture { system @cmd; }; +ok(!$rv, 'Data exported to test files without error'); +ok(glob(bz_locations()->{'datadir'} . '/' . $snapshot_date . '-bugs-*.json'), + 'Export test files exist'); + +### Section 5: Export data to BigQuery test instance + +@cmd = ( + './extensions/BMO/bin/export_bmo_etl.pl', + '--verbose', '--snapshot-date', $snapshot_date, +); + +($output, $error, $rv) = capture { system @cmd; }; +ok(!$rv, 'Data exported to BigQuery test instance without error'); + +### Section 6: Retrieve data from BigQuery instance and verify + +my $query = { + query => 'SELECT summary FROM test.bugzilla.bugs WHERE id = ' + . $bug_id_1 + . ' AND snapshot_date = \'' + . $snapshot_date . '\';', + useLegacySql => false +}; +$t->post_ok( + 'http://bq:9050/bigquery/v2/projects/test/queries' => json => $query) + ->status_is(200)->json_is('/rows/0/f/0/v' => $new_bug_1->{summary}); + +$query = { + query => 'SELECT description FROM test.bugzilla.attachments WHERE id = ' + . $attach_id + . ' AND snapshot_date = \'' + . $snapshot_date . '\';', + useLegacySql => false +}; +$t->post_ok( + 'http://bq:9050/bigquery/v2/projects/test/queries' => json => $query) + ->status_is(200)->json_is('/rows/0/f/0/v' => $new_attach_1->{summary}); + +$query = { + query => + 'SELECT depends_on_id FROM test.bugzilla.bug_dependencies WHERE bug_id = ' + . $bug_id_2 + . ' AND snapshot_date = \'' + . $snapshot_date . '\';', + useLegacySql => false +}; +$t->post_ok( + 'http://bq:9050/bigquery/v2/projects/test/queries' => json => $query) + ->status_is(200)->json_is('/rows/0/f/0/v' => $bug_id_1); + +$query = { + query => 'SELECT bug_id FROM test.bugzilla.flags WHERE bug_id = ' + . $bug_id_1 + . ' AND name = \'needinfo\' AND value = \'X\' AND snapshot_date = \'' + . $snapshot_date . '\'', + useLegacySql => false +}; +$t->post_ok( + 'http://bq:9050/bigquery/v2/projects/test/queries' => json => $query) + ->status_is(200)->json_is('/rows/0/f/0/v' => $bug_id_1); + +$query = { + query => 'SELECT bug_id FROM test.bugzilla.flags WHERE bug_id = ' + . $bug_id_1 + . ' AND name = \'review\' AND attachment_id = ' + . $attach_id + . ' AND snapshot_date = \'' + . $snapshot_date . '\';', + useLegacySql => false +}; + +$t->post_ok( + 'http://bq:9050/bigquery/v2/projects/test/queries' => json => $query) + ->status_is(200)->json_is('/rows/0/f/0/v' => $bug_id_1); + +### Section 7: Exporting again on the same day (with the same snapshot date) will cause the script to exit + +@cmd = ( + './extensions/BMO/bin/export_bmo_etl.pl', + '--verbose', '--snapshot-date', $snapshot_date, +); + +($output, $error, $rv) = capture { system @cmd; }; +ok($rv, 'Duplicate data exported to BigQuery test instance should fail'); + +done_testing; diff --git a/extensions/BMO/template/en/default/hook/admin/params/editparams-current_panel.html.tmpl b/extensions/BMO/template/en/default/hook/admin/params/editparams-current_panel.html.tmpl index 5d3226ec5d..1ce5374bc3 100644 --- a/extensions/BMO/template/en/default/hook/admin/params/editparams-current_panel.html.tmpl +++ b/extensions/BMO/template/en/default/hook/admin/params/editparams-current_panel.html.tmpl @@ -18,5 +18,12 @@ ELSIF panel.name == "bugfields"; ELSIF panel.name == "bugchange"; panel.param_descs.enable_triaged_keyword = 'Enforce usage of the "triaged" keyword on selected products.'; +ELSIF panel.name == "reports"; + panel.param_descs.bmo_etl_enabled = 'Enable export to BMO ETL.'; + panel.param_descs.bmo_etl_base_url = 'The base URL for sending BMO ETL data.'; + panel.param_descs.bmo_etl_service_account = 'The Google service account for accessing the BMO ETL API.'; + panel.param_descs.bmo_etl_project_id = 'The project ID for the BMO ETL data.'; + panel.param_descs.bmo_etl_dataset_id = 'The dataset ID for the BMO ETL data.'; + END; %]