diff --git a/governance/CICD/branch-out-to-new-workspace/AzDO/Branch_out_workspace.yml b/governance/CICD/branch-out-to-new-workspace/AzDO/Branch_out_workspace.yml new file mode 100644 index 0000000..96298c3 --- /dev/null +++ b/governance/CICD/branch-out-to-new-workspace/AzDO/Branch_out_workspace.yml @@ -0,0 +1,92 @@ +# Starter pipeline +# Start with a minimal pipeline that you can customize to build and deploy your code. +# Add steps that build, run tests, deploy, and more: +# https://aka.ms/yaml + +trigger: none + +parameters: +- name: source_workspace + displayName: Enter source workspace + type: string + default: 'Dev_WS_CICDSample_3' + +- name: target_workspace + displayName: Enter target workspace name + type: string + default: 'Dev_WS_CICDSample_Clone5' + +- name: copy_lakehouse_data + displayName: Copy Lakehouse Data (enter True or False) + type: string + default: 'True' + +- name: copy_warehouse_data + displayName: Copy Lakehouse Data (enter True or False) + type: string + default: 'False' + +- name: create_lakehouse_shortcuts + displayName: Create lakehouse shortcuts (only if copy lakehouse data set to False) + type: string + default: 'False' + +- name: developer_email + displayName: Enter developer email + type: string + default: 'reportbuilder1@MngEnvMCAP553100.onmicrosoft.com' + +- name: capacity_id + displayName: Enter capacity ID of the new workspace + type: string + default: 'B34D9528-0FF8-4E40-865D-8BA769F574BB' + + +- name: ado_branch + displayName: Enter the source branch name + type: string + default: 'main' + +- name: connections_from_to + displayName: Swap connections in pipelines using names or IDs in the format (from,to) format + type: string + default: "('4498340c-27cf-4c6e-a025-00e5de6b0726','4498340c-27cf-4c6e-a025-00e5de6b0726')" + +variables: +- group: Fabric_Deployment_Group_S +- group: Fabric_Deployment_Group_NS + +pool: + name: Azure Pipelines + +stages: + - stage: CreateWorkspace + jobs: + - job: 'BranchOut' + steps: + + - script: pip install requests + displayName: 'Install requests' + - script: pip install msal + displayName: 'Install msal' + - script: | + echo Add other tasks to build, test, and deploy your project. + echo See https://aka.ms/yaml + + - task: PythonScript@0 + inputs: + scriptSource: 'filePath' + scriptPath: 'scripts/BranchOut-Feature-Workspace-Automation.py' + arguments: '--ADO_ORG_NAME $(ADO_ORG_NAME) --ADO_REPO_NAME $(ADO_REPO_NAME) --ADO_PROJECT_NAME $(ADO_PROJECT_NAME) --ADO_NEW_BRANCH ${{ parameters.target_workspace}} --DEVELOPER ${{ parameters.developer_email }} --WORKSPACE_NAME ${{ parameters.target_workspace }} --CAPACITY_ID ${{ parameters.capacity_id }} --ADO_API_URL $(ADO_API_URL) --ADO_MAIN_BRANCH ${{ parameters.ado_branch }} --TENANT_ID $(TENANT_ID) --FABRIC_TOKEN $(fabrictoken) --ADO_PAT_TOKEN $(azdopat) --CLIENT_ID $(azclientid) --USER_NAME $(username) --PASSWORD $(password)' + #failOnStderr: true + displayName: 'Run Branch-Out-To-New-Workspace Script' + + - task: PythonScript@0 + inputs: + scriptSource: 'filePath' + scriptPath: 'scripts/Run_post_activity.py' + arguments: '--FABRIC_TOKEN $(fabrictoken) --NOTEBOOK_WORKSPACE_ID $(NOTEBOOK_WORKSPACE_ID) --NOTEBOOK_ID $(NOTEBOOK_ID) --SOURCE_WORKSPACE ${{ parameters.source_workspace }} --TARGET_WORKSPACE ${{ parameters.target_workspace }} --TENANT_ID $(TENANT_ID) --COPY_LAKEHOUSE ${{ parameters.copy_lakehouse_data }} --COPY_WAREHOUSE ${{ parameters.copy_warehouse_data }} --CREATE_SHORTCUTS ${{ parameters.create_lakehouse_shortcuts }} --USER_NAME $(username) --PASSWORD $(password) --CLIENT_ID $(azclientid) --CONNECTIONS_FROM_TO ${{ parameters.connections_from_to }}' + #failOnStderr: true + displayName: 'Invoke Fabric Post Activity Job' + + diff --git a/governance/CICD/branch-out-to-new-workspace/AzDO/scripts/BranchOut-Feature-Workspace-Automation.py b/governance/CICD/branch-out-to-new-workspace/AzDO/scripts/BranchOut-Feature-Workspace-Automation.py new file mode 100644 index 0000000..405bda3 --- /dev/null +++ b/governance/CICD/branch-out-to-new-workspace/AzDO/scripts/BranchOut-Feature-Workspace-Automation.py @@ -0,0 +1,383 @@ +import requests +import json +import msal +import argparse +import logging +import base64 +import time + +# Constants +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logging.info('starting...') + +FABRIC_API_URL = "https://api.fabric.microsoft.com/v1" +ADO_API_URL = "" +CAPACITY_ID = "" +WORKSPACE_NAME = "" +DEVELOPER = "" +ADO_MAIN_BRANCH = "" +ADO_NEW_BRANCH = "" +ADO_PROJECT_NAME = "" +ADO_REPO_NAME = "" +ADO_ORG_NAME = "" +CLIENT_ID = "" +CLIENT_SECRET = "" +TENANT_ID = "" +USERNAME = "" +PASSWORD = "" +FABRIC_TOKEN = "" +ADO_PAT_TOKEN= "" + +logging.info('Starting branch out script....') + +# Define a function to acquire token for ADO using using AAD username password +def acquire_ado_token_user_id_password(tenant_id, client_id,user_name,password,kvtoken): + def encode_pat(pat): + # Encode the PAT in base64 + encoded_pat = base64.b64encode(pat.encode('utf-8')).decode('utf-8') + return encoded_pat + + if kvtoken != "": + logging.info("Using PAT token for ADO authentication as token value has been set in Azure Key Vault") + access_token = encode_pat(':'+kvtoken) + + else: + logging.info("No PAT token was set therefore generating ADO token using user account...") + # Initialize the MSAL public client + authority = f'https://login.microsoftonline.com/{tenant_id}' + app = msal.PublicClientApplication(client_id, authority=authority) + scopes = ['499b84ac-1321-427f-aa17-267ca6975798/.default'] + result = app.acquire_token_by_username_password(user_name, password, scopes) + if 'access_token' in result: + access_token = result['access_token'] + logging.info(" ADO token Generated") + else: + access_token = None + + return access_token + + +# Define a function to acquire token using AAD username password +def acquire_token_user_id_password(tenant_id, client_id,user_name,password): + + # Initialize the MSAL public client + authority = f'https://login.microsoftonline.com/{tenant_id}' + app = msal.PublicClientApplication(client_id, authority=authority) + scopes = ['https://api.fabric.microsoft.com/.default'] + result = app.acquire_token_by_username_password(user_name, password, scopes) + #logging.info('Token result: '+str(result)) + if 'access_token' in result: + access_token = result['access_token'] + else: + access_token = None + logging.error('Error: Token could not be obtained: '+str(result)) + return access_token + +# For Future Use: Define a function to acquire token using SPN +def acquire_token_spn(tenant_id,client_id,client_secret): + app = msal.ConfidentialClientApplication( + client_id, + authority=f"https://login.microsoftonline.com/{tenant_id}", + client_credential=client_secret + ) + result = app.acquire_token_for_client(scopes=SCOPES) + if "access_token" in result: + return result["access_token"] + else: + logging.info(f"Error acquiring token: {result.get('error_description')}") + return None + + +# Function to create a Fabric workspace +def create_fabric_workspace(workspace_name,cpty_id, token): + try: + logging.info(f"Creating Fabric Workspace {WORKSPACE_NAME}... ") + headers = {"Authorization": f"Bearer {token}"} + data = { + "displayName": workspace_name, + "capacityId": cpty_id + } + response = requests.post(f"{FABRIC_API_URL}/workspaces", headers=headers, json=data) + # uncomment the line below if you need more debug information from the http request + logging.info(str(response.status_code) + ' - ' + response.text) + #response.raise_for_status() + if response.status_code == 409: + logging.error(f"Workspace '{workspace_name}' already exists.") + raise ValueError("Fabric workspace already exists. Please specify a new workspace as target.") + elif response.status_code == 201: + logging.info(f"Fabric Workspace {WORKSPACE_NAME} created with ID: {response.json()['id']} successfully... ") + return response.json()["id"] + elif response.status_code != 201: + logging.error(f"Could not create workspace. Error: {response.text}") + return None + else: + logging.error("Unknown error occurred. Please review the logs.") + return None + except requests.exceptions.RequestException as e: + logging.error(f"Error creating workspace: {e}") + return None + +# Function to add developers as workspace admins +def add_workspace_admins(workspace_id, developer, token): + try: + logging.info(f"Adding developer {developer} to workspace {WORKSPACE_NAME} in progress") + headers = {"Authorization": f"Bearer {token}"} + data = { + "emailAddress": developer, + "groupUserAccessRight": "Admin" + } + + response = requests.post(f"https://api.powerbi.com/v1.0/myorg/admin/groups/{workspace_id}/users", headers=headers, json=data) + + response.raise_for_status() + logging.info(f"Done") + + except requests.exceptions.RequestException as e: + logging.info(f"Error adding workspace admin: {e}") + #os._exit(1) + + +# Function to create a new branch in Azure DevOps +def create_azure_devops_branch(project_name, repo_name, main_branch, new_branch): + # aquiring azdo token + token = acquire_ado_token_user_id_password(TENANT_ID, CLIENT_ID,USERNAME,PASSWORD, ADO_PAT_TOKEN) + + if token: + + try: + if ADO_PAT_TOKEN != "": + token_type = 'Basic' + else: + token_type = 'Bearer' + + logging.info(f"Using token type {token_type} in request header. This is determined whether the token value for AZDO PAT has been set.") + headers = {"Authorization": f"{token_type} {token}", "Content-Type": "application/json"} + data = [ + { + "name":f"refs/heads/{new_branch}", + "oldObjectId": "0000000000000000000000000000000000000000", + "newObjectId": get_branch_object_id(project_name, repo_name, main_branch, token, token_type) + } + ] + logging.info(f"Creating feature branch {new_branch} based on {main_branch}...") + response = requests.post(f"{ADO_API_URL}/{ADO_ORG_NAME}/{project_name}/_apis/git/repositories/{repo_name}/refs?api-version=7.1", headers=headers, json=data) + response.raise_for_status() + logging.info(f"Feature branch {new_branch} created") + except requests.exceptions.RequestException as e: + logging.info(f"Error creating Azure DevOps branch: {e}") + #os._exit(1) + + else: + logging.error("Terminating branch out process as token could not be generated. Please either set an AZDO PAT token or specify a valid user account with sufficient permissions. ") + raise ValueError("Could not generate AZDO token.") + + +# Helper function to get the object ID of a branch +def get_branch_object_id(project_name, repo_name, branch_name, token, token_type): + try: + logging.info(f"Retriving ID of main branch {branch_name} to be cloned ") + headers = {"Authorization": f"{token_type} {token}"} + response = requests.get(f"{ADO_API_URL}/{ADO_ORG_NAME}/{project_name}/_apis/git/repositories/{repo_name}/refs/heads/{branch_name}?api-version=7.1", headers=headers) + response.raise_for_status() + logging.info(f"BranchID: {response.text}") + return response.json()["value"][0]["objectId"] + except requests.exceptions.RequestException as e: + logging.info(f"Error getting branch object ID: {e}") + return None + +# Function to connect Azure DevOps branch to Fabric workspace +def connect_branch_to_workspace(workspace_id, project_name, org_name, repo_name, branch_name, token): + try: + logging.info(f"Conecting workspace {workspace_id} to feature branch {branch_name} is in progess..") + headers = {"Authorization": f"Bearer {token}"} + data = { + "gitProviderDetails": { + "organizationName": org_name, + "projectName": project_name, + "gitProviderType": "AzureDevOps", + "repositoryName": repo_name, + "branchName": branch_name, + "directoryName": "" + } + } + response = requests.post(f"{FABRIC_API_URL}/workspaces/{workspace_id}/git/connect", headers=headers, json=data) + response.raise_for_status() + except requests.exceptions.RequestException as e: + logging.info(f"Error connecting branch to workspace: {e}") + + +def long_running_operation_polling(uri,retry_after,headers): + keep_polling = True + try: + logging.info(f"Polling long running operation ID {uri} has been started with a retry-after time of {retry_after} seconds.") + while keep_polling: + response = requests.get(uri,headers=headers) + operation_state = response.json() + logging.info('operation state = '+str(operation_state)) + logging.info(f"Long running operation status: {operation_state['status']}") + if operation_state['status'] in ["NotStarted", "Running"]: + time.sleep(retry_after) + keep_polling = True + else: + keep_polling = False + if operation_state['status'] == "Failed": + logging.info(f"The long running operation has been completed with failure. Error response: {json.dumps(operation_state['Error'])}") + else: + logging.info("The long running operation has been successfully completed.") + #response = client.get(uri+'/result') + return operation_state['status'] + except Exception as e: + logging.error(f"The long running operation has been completed with failure. Error response: {e}") + +def initialize_workspace_from_git(workspace_id,token): + + try: + logging.info(f"Connecting f{WORKSPACE_NAME} to feature branch {ADO_NEW_BRANCH} is in propress... ") + headers = {"Authorization": f"Bearer {token}"} + # Initialize the connection to the GIT repository + gitinitializeurl = f"{FABRIC_API_URL}/workspaces/{workspace_id}/git/initializeConnection" + response = requests.post(gitinitializeurl, headers=headers) + + #print(response.json()) + + if response.status_code == 200: + git_status = response.json() + remote_commit_hash = git_status['remoteCommitHash'] + workspace_head = git_status['workspaceHead'] + + # Define the update parameters with conflict resolution policy + update_params = { + 'workspaceHead': workspace_head, + 'remoteCommitHash': remote_commit_hash, + 'options': { + 'allowOverrideItems': True, + 'conflictResolution': 'RemoteSync' # Set conflict resolution to RemoteSync + } + } + + # Update the workspace + updateworkspaceAllurl = f"{FABRIC_API_URL}/workspaces/{workspace_id}/git/updateFromGit" + update_response = requests.post(updateworkspaceAllurl, headers=headers, json=update_params) + + if update_response.status_code == 200: + git_status = update_response.json() + logging.info(f"Feature workspace {WORKSPACE_NAME} is synchronizing with feature branch {ADO_NEW_BRANCH} ") + #print(git_status) + elif update_response.status_code == 202: + logging.info('Request accepted, update workspace is in progress...') + location_url = update_response.headers.get("Location") + logging.info(f"Polling URL to track operation status is {location_url}") + time.sleep(15) + response = long_running_operation_polling(location_url, 15, headers) + else: + logging.error(f'Failed to update the workspace. Status Code: {update_response.status_code} - {update_response.text}') + + elif response.status_code == 202: + logging.info('Request accepted, get initialize in progress. Retry after some time') + + else: + logging.info(f'Failed to Git initialize. Status Code: {response.status_code}') + + except requests.exceptions.RequestException as e: + logging.error(f"An error occurred: {e}") + +def set_main_parameters(): + global TENANT_ID + global USERNAME + global PASSWORD + global WORKSPACE_NAME + global DEVELOPER + global ADO_MAIN_BRANCH + global ADO_NEW_BRANCH + global ADO_PROJECT_NAME + global ADO_REPO_NAME + global ADO_ORG_NAME + global ADO_API_URL + global CLIENT_ID + global CLIENT_SECRET + global CAPACITY_ID + global FABRIC_TOKEN + global ADO_PAT_TOKEN + + try: + parser = argparse.ArgumentParser() + parser.add_argument('--ADO_ORG_NAME',type=str, help= 'ADO organization name') + parser.add_argument('--TENANT_ID',type=str, help= 'TenantID passed from Devops') + parser.add_argument('--CLIENT_ID',type=str, help= 'ClientID passed from Devops') + #parser.add_argument('--CLIENT_SECRET',type=str, help= 'CLIENTSECRET passed from Devops') + parser.add_argument('--USER_NAME',type=str, help= 'User Name passed from Devops') + parser.add_argument('--PASSWORD',type=str, help= 'User password passed from Devops') + parser.add_argument('--WORKSPACE_NAME',type=str, help= 'Name of the feature workspace to be created') + parser.add_argument('--DEVELOPER',type=str, help= 'Developr UPN to be added to workspace as admin') + parser.add_argument('--ADO_MAIN_BRANCH',type=str, help= 'Main development branch') + parser.add_argument('--ADO_NEW_BRANCH',type=str, help= 'New branch to be created') + parser.add_argument('--ADO_PROJECT_NAME',type=str, help= 'ADO project name') + parser.add_argument('--ADO_REPO_NAME',type=str, help= 'ADO repository name') + parser.add_argument('--ADO_API_URL',type=str, help= 'ADO organization name') + parser.add_argument('--CAPACITY_ID',type=str, help= 'Capacity ID to assign the workspace') + parser.add_argument('--FABRIC_TOKEN',type=str, help= 'Fabric user token') + parser.add_argument('--ADO_PAT_TOKEN',type=str, help= 'ADO PAT token') + + args = parser.parse_args() + except Exception as e: + logging.error(f'Error: {e}') + raise ValueError("Could not extract parameters: {e}") + + #Bind parameters to script variables + TENANT_ID = args.TENANT_ID + USERNAME = args.USER_NAME + PASSWORD = args.PASSWORD + WORKSPACE_NAME = args.WORKSPACE_NAME + DEVELOPER = args.DEVELOPER + ADO_MAIN_BRANCH = args.ADO_MAIN_BRANCH + ADO_NEW_BRANCH = args.ADO_NEW_BRANCH + ADO_PROJECT_NAME = args.ADO_PROJECT_NAME + ADO_REPO_NAME = args.ADO_REPO_NAME + ADO_ORG_NAME = args.ADO_ORG_NAME + ADO_API_URL = args.ADO_API_URL + CLIENT_ID = args.CLIENT_ID + CAPACITY_ID = args.CAPACITY_ID + FABRIC_TOKEN = args.FABRIC_TOKEN + ADO_PAT_TOKEN = args.ADO_PAT_TOKEN + + # For future use when service principal is supported + #CLIENT_SECRET = args.CLIENT_SECRET + + +def main(): + logging.info('In main....') + + set_main_parameters() + token = "" + if FABRIC_TOKEN != "": + logging.info('Fabric token found, fetching token...') + token = FABRIC_TOKEN + else: + logging.info('Service account found, generating token...') + token = acquire_token_user_id_password(TENANT_ID, CLIENT_ID,USERNAME,PASSWORD) + + if token: + logging.info('Invoking new workspace routine...') + workspace_id = create_fabric_workspace(WORKSPACE_NAME, CAPACITY_ID, token) + if workspace_id: + logging.info(f'Workspace {WORKSPACE_NAME} ({workspace_id}) successfully created and assigned to capacity {CAPACITY_ID}') + logging.info(f'Adding workspace admins {DEVELOPER}...') + add_workspace_admins(workspace_id, DEVELOPER, token) + logging.info(f'Creating ado branch from main {ADO_MAIN_BRANCH}...') + create_azure_devops_branch(ADO_PROJECT_NAME, ADO_REPO_NAME, ADO_MAIN_BRANCH, ADO_NEW_BRANCH) + logging.info(f'Connecting workspace to branch {ADO_NEW_BRANCH}...') + connect_branch_to_workspace(workspace_id, ADO_PROJECT_NAME, ADO_ORG_NAME,ADO_REPO_NAME, ADO_NEW_BRANCH, token) + logging.info('Initialize workspace...') + initialize_workspace_from_git(workspace_id, token) + else: + logging.error("Terminating branch out process as target workspace could not be created. Please review the logs and ensure you have required permissions on the Fabric tenant. If using a Fabric token also ensure that a valid token has been generated within the last hour. ") + raise ValueError("Could not create Fabric workspace.") + + else: + logging.error("Terminating branch out process due to credential error. Please use either a valid user account where MFA is not required or generate a recent (within 1 hour) valid Fabric token and store in the referenced Key Vault. ") + raise ValueError("Could not generate authentication token. Please review the debug logs.") + +if __name__ == "__main__": + main() + diff --git a/governance/CICD/branch-out-to-new-workspace/AzDO/scripts/Run_post_activity.py b/governance/CICD/branch-out-to-new-workspace/AzDO/scripts/Run_post_activity.py new file mode 100644 index 0000000..a191070 --- /dev/null +++ b/governance/CICD/branch-out-to-new-workspace/AzDO/scripts/Run_post_activity.py @@ -0,0 +1,109 @@ +import requests +import json +import argparse +import os +import logging + +# Constants +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logging.info('starting...') +logging.info('Starting. Parsing arguments...') + +FABRIC_TOKEN = "" +WS_ID = "" +NOTEBOOK_ID = "" +TENANT_ID = "" +CLIENT_ID = "" +USERNAME = "" +PASSWORD = "" +CONNECTIONS_FROM_TO = "" + + +parser = argparse.ArgumentParser() +parser.add_argument('--FABRIC_TOKEN',type=str, help= 'Fabric user token') +parser.add_argument('--SOURCE_WORKSPACE',type=str, help= 'Source workspace') +parser.add_argument('--COPY_LAKEHOUSE',type=str, help= 'Copy lakehoues data from source to target') +parser.add_argument('--CREATE_SHORTCUTS',type=str, help= 'Create shortcuts back to source lakehouse in target lakehouse') +parser.add_argument('--COPY_WAREHOUSE',type=str, help= 'Copy warehoues data') +parser.add_argument('--TARGET_WORKSPACE',type=str, help= 'Target workspace') +parser.add_argument('--NOTEBOOK_WORKSPACE_ID',type=str, help= 'Workspace GUID where the post activity notebook is saved') +parser.add_argument('--NOTEBOOK_ID',type=str, help= 'GUID of the post activity notebook') +parser.add_argument('--TENANT_ID',type=str, help= 'Tenant ID of the service principal/user ') +parser.add_argument('--CLIENT_ID',type=str, help= 'ClientID of the service principal/user') +parser.add_argument('--USER_NAME',type=str, help= 'User Name passed from Devops') +parser.add_argument('--PASSWORD',type=str, help= 'User password passed from Devops') +parser.add_argument('--CONNECTIONS_FROM_TO',type=str, help= 'Connections change from a UUID or name to UUID or name') + + +args = parser.parse_args() +FABRIC_TOKEN = args.FABRIC_TOKEN +SOURCE_WS = args.SOURCE_WORKSPACE +TARGET_WS = args.TARGET_WORKSPACE +COPY_LH = args.COPY_LAKEHOUSE +COPY_WH = args.COPY_WAREHOUSE +CREATE_SC = args.CREATE_SHORTCUTS +WS_ID = args.NOTEBOOK_WORKSPACE_ID +NOTEBOOK_ID = args.NOTEBOOK_ID +TENANT_ID = args.TENANT_ID +CLIENT_ID = args.CLIENT_ID +USERNAME = args.USER_NAME +PASSWORD = args.PASSWORD +CONNECTIONS_FROM_TO = args.CONNECTIONS_FROM_TO + +def acquire_token_user_id_password(tenant_id, client_id,user_name,password): + + # Initialize the MSAL public client + logging.info("Generating Token for Microsoft Fabric in progress...") + authority = f'https://login.microsoftonline.com/{tenant_id}' + app = msal.PublicClientApplication(client_id, authority=authority) + scopes = ['https://api.fabric.microsoft.com/.default'] + result = app.acquire_token_by_username_password(user_name, password, scopes) + #logging.info('Token result: '+str(result)) + if 'access_token' in result: + access_token = result['access_token'] + logging.info("Generating Token for Microsoft Fabric generated") + + else: + access_token = None + logging.error('Error: Token could not be obtained: '+str(result)) + return access_token + +logging.info('Checking for supplied credentials...') +if FABRIC_TOKEN!="": + logging.info('Fabric token found...') + token = FABRIC_TOKEN +else: + logging.info('User creds found, generating token...') + token = acquire_token_user_id_password(TENANT_ID,CLIENT_ID,user_name,password) + +if token: + if NOTEBOOK_ID == '': + raise ValueError('Error: Could not execute notebook as no Notebook ID has been specified.') + + + plurl = 'https://api.fabric.microsoft.com/v1/workspaces/'+WS_ID +'/items/'+NOTEBOOK_ID+'/jobs/instances?jobType=RunNotebook' + + headers = { + "Authorization": f"Bearer {token}", + "Content-Type": "application/json" # Set the content type based on your request + } + logging.info('Setting notebook parameters...') + payload_data = '{' \ + '"executionData": {' \ + '"parameters": {' \ + '"_inlineInstallationEnabled": {"value": "True", "type": "bool"},' \ + '"source_ws": {"value": "' + SOURCE_WS + '", "type": "string"},' \ + '"copy_lakehouse_data": {"value": "' + COPY_LH + '", "type": "bool"},' \ + '"create_lakehouse_shortcuts": {"value": "' + CREATE_SC + '", "type": "bool"},' \ + '"copy_warehouse_data": {"value": "' + COPY_WH + '", "type": "bool"},' \ + '"target_ws": {"value": "' + TARGET_WS + '", "type": "string"},' \ + '"p_connections_from_to": {"value": "' + CONNECTIONS_FROM_TO + '", "type": "string"}' \ + '}}}' + logging.info('Invoking Fabric notebook job...') + plresponse = requests.post(plurl, json=json.loads(payload_data), headers=headers) + logging.info(str(plresponse.status_code) + ' - ' + plresponse.text) +else: + logging.error("Could not aquire token") + raise ValueError("Could not generate authentication token. Please review the debug logs.") + diff --git a/governance/CICD/branch-out-to-new-workspace/Branch Out To New Workspace.pdf b/governance/CICD/branch-out-to-new-workspace/Branch Out To New Workspace.pdf new file mode 100644 index 0000000..a1d1c2c Binary files /dev/null and b/governance/CICD/branch-out-to-new-workspace/Branch Out To New Workspace.pdf differ diff --git a/governance/CICD/branch-out-to-new-workspace/Fabric/Branch out to new workspace - Post Activity.ipynb b/governance/CICD/branch-out-to-new-workspace/Fabric/Branch out to new workspace - Post Activity.ipynb new file mode 100644 index 0000000..77fa5a2 --- /dev/null +++ b/governance/CICD/branch-out-to-new-workspace/Fabric/Branch out to new workspace - Post Activity.ipynb @@ -0,0 +1 @@ +{"cells":[{"cell_type":"code","source":["%pip -q install semantic-link-labs\n"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":true},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"1b03316d-c088-4a0e-a2f0-44d45d112121"},{"cell_type":"code","source":["%pip install jmespath"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"c68be6ba-7648-457f-af82-f1987d12d7f7"},{"cell_type":"code","source":["source_ws = ''\n","target_ws = ''\n","\n","# Either copy lakehouse data or create shortcuts, set at most one of these to True \n","copy_lakehouse_data = True\n","copy_warehouse_data = True\n","create_lakehouse_shortcuts = False\n","\n","\n","# If false then shortcuts will be created. If you wish to create shortcuts based on a pattern match please set the param below\n","# enter pattern match for creating shortcuts - see https://github.com/arasdk/fabric-code-samples/blob/main/shortcuts/fabric_shortcut_creator.py \n","PATTERN_MATCH = [\"*\"]\n","_inlineInstallationEnabled = True\n","\n","\n","p_connections_from_to = ()#('https://api.fabric.microsoft.com/v1/workspaces/ admin','4498340c-27cf-4c6e-a025-00e5de6b0726'),('4498340c-27cf-4c6e-a025-00e5de6b0726','https://api.fabric.microsoft.com/v1/workspaces/ admin'),('https://api.fabric.microsoft.com/v1/workspaces/ admin','4498340c-27cf-4c6e-a025-00e5de6b0726')"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"},"tags":["parameters"]},"id":"90efaa4f-846d-4924-900e-258837a3467d"},{"cell_type":"markdown","source":["##### Branch out to new workspace notebook\n","\n","This notebook runs post activity tasks after [branch out to new workspace functionality](https://blog.fabric.microsoft.com/en-us/blog/introducing-new-branching-capabilities-in-fabric-git-integration).\n","\n","In addition to this:\n","\n","\n","Requirements:\n","\n","\n","Limitations of current script:\n","\n","\n","\n","\n"],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"a98b6d0a-7a36-4116-ab0d-aa70144eb737"},{"cell_type":"markdown","source":["##### Install semantic link labs to support advanced functionality\n","\n","https://semantic-link-labs.readthedocs.io/en/latest/index.html\n","https://github.com/microsoft/semantic-link-labs/blob/main/README.md\n","\n"],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"3b887bd6-a9c9-430f-b58f-b58a93f5ce29"},{"cell_type":"markdown","source":["##### Install Jmespath to make data pipeline changes such as updating linked notebooks, warehouses and lakehouses "],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"8a74ed11-dd64-43bb-a735-906a947c8666"},{"cell_type":"markdown","source":["##### Set parameters\n","Before running this notebook ensure these parameters are set correctly. If necessary these can be passed in via a data factory pipeline"],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"dee81614-b92b-4242-890a-b11f97b1a640"},{"cell_type":"markdown","source":["##### Library imports and fabric rest client setup\n","\n","https://learn.microsoft.com/en-us/python/api/semantic-link-sempy/sempy.fabric.fabricrestclient"],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"4fb01e1d-ec4e-4c69-b544-66f6d8c5a475"},{"cell_type":"code","source":["import pandas as pd\n","import datetime, time\n","import re,json, fnmatch,os\n","import requests, base64\n","import sempy\n","import sempy.fabric as fabric\n","from sempy.fabric.exceptions import FabricHTTPException, WorkspaceNotFoundException\n","from pyspark.sql import DataFrame\n","from pyspark.sql.functions import col,current_timestamp,lit\n","import sempy_labs as labs\n","from sempy_labs import migration, directlake\n","from sempy_labs import lakehouse as lake\n","from sempy_labs import report as rep\n","from sempy_labs.tom import connect_semantic_model\n","\n","# instantiate the Fabric rest client\n","client = fabric.FabricRestClient()\n","\n","# get the current workspace ID based on the context of where this notebook is run from\n","thisWsId = notebookutils.runtime.context['currentWorkspaceId']\n","thisWsName = notebookutils.runtime.context['currentWorkspaceName']\n","\n","source_ws_id = fabric.resolve_workspace_id(source_ws)\n","target_ws_id = fabric.resolve_workspace_id(target_ws)\n"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"391624c1-b299-452d-9ebf-f32626d49970"},{"cell_type":"markdown","source":["##### Update default lakehouses for notebooks\n","\n","Update notebook dependencies based on but now supports T-SQL notebooks:\n","https://github.com/PowerBiDevCamp/FabConWorkshopSweden/blob/main/DemoFiles/GitUpdateWorkspace/updateWorkspaceDependencies_v1.ipynb\n"],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"aaae8a08-588d-4dd8-9d2c-2200b7a88d30"},{"cell_type":"code","source":["\n","for notebook in notebookutils.notebook.list(workspaceId=target_ws_id):\n"," if True: #notebook.displayName == 'T-SQL_Notebook': #notebook.displayName != 'Create Feature Branch':\n","\n"," # Get the current notebook definition\n"," notebook_def = notebookutils.notebook.getDefinition(notebook.displayName,workspaceId=source_ws_id)\n"," json_payload = json.loads(notebook_def)\n"," #print(json.dumps(json_payload, indent=4))\n"," # Check and remove any attached lakehouses\n"," if 'dependencies' in json_payload['metadata'] \\\n"," and 'lakehouse' in json_payload['metadata']['dependencies'] \\\n"," and json_payload['metadata'][\"dependencies\"][\"lakehouse\"] is not None:\n"," # Remove all lakehouses\n"," current_lakehouse = json_payload['metadata']['dependencies']['lakehouse']\n"," if 'default_lakehouse_name' in current_lakehouse:\n"," json_payload['metadata']['dependencies']['lakehouse'] = {}\n"," print(f\"Attempting to update notebook {notebook.displayName} with new default lakehouse: {current_lakehouse['default_lakehouse_name']} in {target_ws}\")\n","\n"," #Update new notebook definition after removing existing lakehouses and with new default lakehouseId\n"," (notebookutils.notebook.updateDefinition(\n"," name = notebook.displayName,\n"," content = json.dumps(json_payload), \n"," defaultLakehouse = current_lakehouse['default_lakehouse_name'],\n"," defaultLakehouseWorkspace = target_ws_id,\n"," workspaceId = target_ws_id\n"," )\n"," )\n"," print(f\"Updated notebook {notebook.displayName} in {target_ws}\")\n"," else:\n"," print(f'No default lakehouse set for notebook {notebook.displayName}, ignoring.')\n","\n"," if 'dependencies' in json_payload['metadata'] and 'warehouse' in json_payload['metadata']['dependencies']:\n"," # Fetch existing details\n"," current_warehouse = json_payload['metadata']['dependencies']['warehouse']\n"," current_warehouse_id = current_warehouse['default_warehouse']\n"," source_wh_name = fabric.resolve_item_name(item_id = current_warehouse_id,workspace=source_ws_id)\n"," #print('Source warehouse name is ' + source_wh_name)\n"," target_wh_id = fabric.resolve_item_id(item_name = source_wh_name,type='Warehouse',workspace=target_ws_id)\n","\n"," if 'default_warehouse' in current_warehouse:\n"," #json_payload['metadata']['dependencies']['warehouse'] = {}\n"," print(f\"Attempting to update notebook {notebook.displayName} with new default warehouse: {target_wh_id} in {target_ws}\")\n"," \n"," #Update new notebook definition after removing existing lakehouses and with new default lakehouseId\n"," json_payload['metadata']['dependencies']['warehouse']['default_warehouse'] = target_wh_id\n"," for warehouse in json_payload['metadata']['dependencies']['warehouse']['known_warehouses']:\n"," if warehouse['id'] == current_warehouse_id:\n"," warehouse['id'] = target_wh_id\n"," #print(json.dumps(json_payload, indent=4))\n"," (notebookutils.notebook.updateDefinition(\n"," name = notebook.displayName,\n"," content = json.dumps(json_payload),\n"," workspaceId = target_ws_id\n"," )\n"," )\n"," print(f\"Updated notebook {notebook.displayName} in {target_ws}\")\n","\n"," else:\n"," print(f\"No default warehouse was found in the source notebook {notebook.displayName} there cannot set default for target\")\n","\n","\n"," else:\n"," print(f'No default lakehouse set for notebook {notebook.displayName}, ignoring.')"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"5c60b5d2-f83c-46f8-9870-9fd609166b67"},{"cell_type":"markdown","source":["##### Run the below cell - contains utility functions to support lakehouse and warehouse initialisation\n","\n","Shortcut creator:\n","https://github.com/arasdk/fabric-code-samples/blob/main/shortcuts/fabric_shortcut_creator.py "],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"a47a56df-219d-4d6c-b950-491909638deb"},{"cell_type":"code","source":["##### \n","### Shortcut utility function \n","####\n","\n","# Extract workspace_id, item_id and path from a onelake URI\n","def extract_onelake_https_uri_components(uri):\n"," # Define a regular expression to match any string between slashes and capture the final path element(s) without the leading slash\n"," pattern = re.compile(r\"abfss://([^@]+)@[^/]+/([^/]+)/(.*)\")\n"," match = pattern.search(uri)\n"," if match:\n"," workspace_id, item_id, path = match.groups()\n"," return workspace_id, item_id, path\n"," else:\n"," return None, None, None\n","\n","\n","def is_valid_onelake_uri(uri: str) -> bool:\n"," workspace_id, item_id, path = extract_onelake_https_uri_components(uri)\n"," if \"abfss://\" not in uri or workspace_id is None or item_id is None or path is None:\n"," return False\n","\n"," return True\n","\n","\n","def get_last_path_segment(uri: str):\n"," path = uri.split(\"/\") # Split the entire URI by '/'\n"," return path[-1] if path else None\n","\n","\n","def is_delta_table(uri: str):\n"," delta_log_path = os.path.join(uri, \"_delta_log\")\n"," return mssparkutils.fs.exists(delta_log_path)\n","\n","\n","def get_onelake_shorcut(workspace_id: str, item_id: str, path: str, name: str):\n"," shortcut_uri = (\n"," f\"v1/workspaces/{workspace_id}/items/{item_id}/shortcuts/{path}/{name}\"\n"," )\n"," result = client.get(shortcut_uri).json()\n"," return result\n","\n","\n","def is_folder_matching_pattern(path: str, folder_name: str, patterns: []):\n"," if folder_name in patterns:\n"," return True\n"," else:\n"," for pattern in patterns:\n"," if fnmatch.fnmatch(folder_name, pattern):\n"," return is_delta_table(path)\n","\n"," return False\n","\n","\n","def get_matching_delta_tables_uris(uri: str, patterns: []) -> []:\n"," # Use a set to avoid duplicates\n"," matched_uris = set()\n"," files = mssparkutils.fs.ls(uri)\n"," folders = [item for item in files if item.isDir]\n","\n"," # Filter folders to only those that matches the pattern and is a delta table\n"," matched_uris.update(\n"," folder.path\n"," for folder in folders\n"," if is_folder_matching_pattern(folder.path, folder.name, patterns)\n"," )\n","\n"," return matched_uris\n","\n","\n","def create_onelake_shorcut(source_uri: str, dest_uri: str):\n"," src_workspace_id, src_item_id, src_path = extract_onelake_https_uri_components(\n"," source_uri\n"," )\n","\n"," dest_workspace_id, dest_item_id, dest_path = extract_onelake_https_uri_components(\n"," dest_uri\n"," )\n","\n"," name = get_last_path_segment(source_uri)\n"," dest_uri_joined = os.path.join(dest_uri, name)\n","\n"," # If the destination path already exists, return without creating shortcut\n"," if mssparkutils.fs.exists(dest_uri_joined):\n"," print(f\"Destination already exists: {dest_uri_joined}\")\n"," return None\n","\n"," request_body = {\n"," \"name\": name,\n"," \"path\": dest_path,\n"," \"target\": {\n"," \"oneLake\": {\n"," \"itemId\": src_item_id,\n"," \"path\": src_path,\n"," \"workspaceId\": src_workspace_id,\n"," }\n"," },\n"," }\n","\n"," shortcut_uri = f\"v1/workspaces/{dest_workspace_id}/items/{dest_item_id}/shortcuts\"\n"," print(f\"Creating shortcut: {shortcut_uri}/{name}..\")\n"," try:\n"," client.post(shortcut_uri, json=request_body)\n"," except FabricHTTPException as e:\n"," print(e)\n"," return None\n","\n"," return get_onelake_shorcut(dest_workspace_id, dest_item_id, dest_path, name)\n"," \n","\n","####\n","## Copy lakehouse and warehouse utility functions\n","####\n","\n","def get_lh_object_list(base_path,data_types = ['Tables', 'Files'])->pd.DataFrame:\n","\n"," '''\n"," Function to get a list of tables for a lakehouse\n"," adapted from https://fabric.guru/getting-a-list-of-folders-and-delta-tables-in-the-fabric-lakehouse\n"," This function will return a pandas dataframe containing names and abfss paths of each folder for Files and Tables\n"," '''\n"," #data_types = ['Tables', 'Files'] #for if you want a list of files and tables\n"," #data_types = ['Tables'] #for if you want a list of tables\n","\n"," df = pd.concat([\n"," pd.DataFrame({\n"," 'name': [item.name for item in notebookutils.fs.ls(f'{base_path}/{data_type}/')],\n"," 'type': data_type[:-1].lower() , \n"," 'src_path': [item.path for item in notebookutils.fs.ls(f'{base_path}/{data_type}/')],\n"," }) for data_type in data_types], ignore_index=True)\n","\n"," return df\n","\n","def get_wh_object_list(schema_list,base_path)->pd.DataFrame:\n","\n"," '''\n"," Function to get a list of tables for a warehouse by schema\n"," '''\n"," data_type = 'Tables'\n"," dfs = []\n","\n"," for schema_prefix in schema_list:\n"," if notebookutils.fs.exists(f'{base_path}/{data_type}/{schema_prefix}/'):\n"," items = notebookutils.fs.ls(f'{base_path}/{data_type}/{schema_prefix}/')\n"," if items: # Check if the list is not empty\n"," df = pd.DataFrame({\n"," 'schema': schema_prefix,\n"," 'name': [item.name for item in items],\n"," 'type': data_type[:-1].lower(),\n"," 'src_path': [item.path for item in items],\n"," })\n"," dfs.append(df)\n","\n"," if dfs: # Check if the list of dataframes is not empty\n"," df = pd.concat(dfs, ignore_index=True)\n"," else:\n"," df = pd.DataFrame() # Return an empty dataframe if no dataframes were created\n","\n"," return df\n","\n","def copy_lh_objects(table_list,workspace_src,workspace_tgt,lakehouse_src,lakehouse_tgt,fastcopy=True,usingIDs=False)->pd.DataFrame:\n"," # declare an array to keep the instrumentation\n"," cpresult = []\n"," # loop through all the tables to extract the source path \n"," for table in table_list.src_path:\n"," source = table\n"," destination = source.replace(f'abfss://{workspace_src}', f'abfss://{workspace_tgt}')\n"," if usingIDs:\n"," destination = destination.replace(f'{lakehouse_src}', f'{lakehouse_tgt}')\n"," else:\n"," destination = destination.replace(f'{lakehouse_src}.Lakehouse', f'{lakehouse_tgt}.Lakehouse')\n"," start_time = datetime.datetime.now()\n"," if notebookutils.fs.exists(destination):\n"," notebookutils.fs.rm(destination, True)\n"," if fastcopy:\n"," # use fastcopy util which is a python wrapper to azcopy\n"," notebookutils.fs.fastcp(source+'/*', destination+'/', True)\n"," else:\n"," notebookutils.fs.cp(source, destination, True)\n","\n"," # recording the timing and add it to the results list\n"," end_time = datetime.datetime.now()\n"," copyreslist = [source, destination, start_time.strftime(\"%Y-%m-%d %H:%M:%S\"), end_time.strftime(\"%Y-%m-%d %H:%M:%S\"), str((end_time - start_time).total_seconds())]\n"," cpresult.append(copyreslist)\n"," return pd.DataFrame(cpresult,columns =['source--------------------------------------','target--------------------------------------','start------------','end_time------------','elapsed seconds----'])\n","\n","def createDWrecoverypl(ws_id,pl_name = 'Recover_Warehouse_Data_From_DR'):\n"," client = fabric.FabricRestClient()\n","\n"," dfurl= \"v1/workspaces/\"+ ws_id + \"/items\"\n"," payload = { \n"," \"displayName\": pl_name, \n"," \"type\": \"DataPipeline\", \n"," \"definition\": { \n"," \"parts\": [ \n"," { \n"," \"path\": \"pipeline-content.json\", \n"," \"payload\": \"{
    "properties": {
        "activities": [
            {
                "name": "IterateSchemaTables",
                "type": "ForEach",
                "dependsOn": [],
                "typeProperties": {
                    "items": {
                        "value": "@pipeline().parameters.tablesToCopy",
                        "type": "Expression"
                    },
                    "batchCount": 20,
                    "activities": [
                        {
                            "name": "CopyWarehouseTables",
                            "type": "Copy",
                            "dependsOn": [
                                {
                                    "activity": "Set table",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "policy": {
                                "timeout": "0.12:00:00",
                                "retry": 2,
                                "retryIntervalInSeconds": 30,
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "typeProperties": {
                                "source": {
                                    "type": "DataWarehouseSource",
                                    "queryTimeout": "02:00:00",
                                    "partitionOption": "None",
                                    "datasetSettings": {
                                        "annotations": [],
                                        "linkedService": {
                                            "name": "07a03006_d1b6_4a39_beb1_0bba2aaf5ff7",
                                            "properties": {
                                                "annotations": [],
                                                "type": "DataWarehouse",
                                                "typeProperties": {
                                                    "endpoint": "@pipeline().parameters.lakehouseConnStr",
                                                    "artifactId": "@pipeline().parameters.lakehouseId",
                                                    "workspaceId": "@pipeline().parameters.workspaceId"
                                                }
                                            }
                                        },
                                        "type": "DataWarehouseTable",
                                        "schema": [],
                                        "typeProperties": {
                                            "schema": "dbo",
                                            "table": {
                                                "value": "@concat(concat(item().schema,'_'),item().name)",
                                                "type": "Expression"
                                            }
                                        }
                                    }
                                },
                                "sink": {
                                    "type": "DataWarehouseSink",
                                    "allowCopyCommand": true,
                                    "tableOption": "autoCreate",
                                    "datasetSettings": {
                                        "annotations": [],
                                        "linkedService": {
                                            "name": "0c03123a_d312_46c4_a8e7_5b4cad8f12d7",
                                            "properties": {
                                                "annotations": [],
                                                "type": "DataWarehouse",
                                                "typeProperties": {
                                                    "endpoint": "@pipeline().parameters.warehouseConnStr",
                                                    "artifactId": "@pipeline().parameters.warehouseId",
                                                    "workspaceId": "@pipeline().parameters.workspaceId"
                                                }
                                            }
                                        },
                                        "type": "DataWarehouseTable",
                                        "schema": [],
                                        "typeProperties": {
                                            "schema": "dbo",
                                            "table": {
                                                "value": "@item().name",
                                                "type": "Expression"
                                            }
                                        }
                                    }
                                },
                                "enableStaging": true,
                                "translator": {
                                    "type": "TabularTranslator",
                                    "typeConversion": true,
                                    "typeConversionSettings": {
                                        "allowDataTruncation": true,
                                        "treatBooleanAsNumber": false
                                    }
                                }
                            }
                        },
                        {
                            "name": "Set table",
                            "type": "SetVariable",
                            "dependsOn": [
                                {
                                    "activity": "Set schema",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "policy": {
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "typeProperties": {
                                "variableName": "Tablename",
                                "value": {
                                    "value": "@item().name",
                                    "type": "Expression"
                                }
                            }
                        },
                        {
                            "name": "Set schema",
                            "type": "SetVariable",
                            "dependsOn": [],
                            "policy": {
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "typeProperties": {
                                "variableName": "Schemaname",
                                "value": {
                                    "value": "@item().schema",
                                    "type": "Expression"
                                }
                            }
                        }
                    ]
                }
            }
        ],
        "parameters": {
            "lakehouseId": {
                "type": "string",
                "defaultValue": "0f0f6b7c-1761-41e6-896e-30014f16ff6d"
            },
            "tablesToCopy": {
                "type": "array",
                "defaultValue": [
                    {
                        "schema": "dbo",
                        "name": "Date"
                    },
                    {
                        "schema": "dbo",
                        "name": "Geography"
                    },
                    {
                        "schema": "dbo",
                        "name": "HackneyLicense"
                    },
                    {
                        "schema": "dbo",
                        "name": "Medallion"
                    },
                    {
                        "schema": "dbo",
                        "name": "Time"
                    },
                    {
                        "schema": "dbo",
                        "name": "Trip"
                    },
                    {
                        "schema": "dbo",
                        "name": "Weather"
                    }
                ]
            },
            "workspaceId": {
                "type": "string",
                "defaultValue": "1501143c-272f-4a2f-976a-7e55971e4c2b"
            },
            "warehouseId": {
                "type": "string",
                "defaultValue": "4d1bd951-99de-4bd7-b7bc-71c8f56db411"
            },
            "warehouseConnStr": {
                "type": "string",
                "defaultValue": "72wwbivi2ubejbrtmtaho32b4y-hqkacfjpe4xuvf3kpzkzohsmfm.datawarehouse.fabric.microsoft.com"
            },
            "lakehouseConnStr": {
                "type": "string",
                "defaultValue": "72wwbivi2ubejbrtmtaho32b4y-hqkacfjpe4xuvf3kpzkzohsmfm.datawarehouse.fabric.microsoft.com"
            }
        },
        "variables": {
            "Tablename": {
                "type": "String"
            },
            "Schemaname": {
                "type": "String"
            }
        },
        "lastModifiedByObjectId": "4aa20af7-94bd-4348-bef8-f8cbcd840d51",
        "lastPublishTime": "2024-11-13T15:52:52Z"
    }
}\", \n"," \"payloadType\": \"InlineBase64\" \n"," } \n"," ] \n"," } \n","} \n"," \n"," response = json.loads(client.post(dfurl,json= payload).content)\n"," return response['id']\n","\n","def getItemId(wks_id,itm_name,itm_type):\n"," df = fabric.list_items(type=None,workspace=wks_id)\n"," #print(df)\n"," if df.empty:\n"," return 'NotExists'\n"," else:\n"," #display(df)\n"," #print(df.query('\"Display Name\"=\"'+itm_name+'\"'))\n"," if itm_type != '':\n"," newdf= df.loc[(df['Display Name'] == itm_name) & (df['Type'] == itm_type)]['Id']\n"," else:\n"," newdf= df.loc[(df['Display Name'] == itm_name)]['Id'] \n"," if newdf.empty:\n"," return 'NotExists'\n"," else:\n"," return newdf.iloc[0]\n"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":true,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"},"collapsed":false},"id":"e46210e9-58c9-483a-84ae-bbdc2ad1c37f"},{"cell_type":"markdown","source":["##### Either create shortcuts from source to target lakehouse(s) or copy data\n","\n","Loops through lakehouse(s) in the target workspace and either populates them with shortcuts or data\n","\n"],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"a15065f3-670d-4bc9-b337-51709f6cdb1f"},{"cell_type":"code","source":["df_lhs = labs.list_lakehouses(source_ws)\n","for index, row in df_lhs.iterrows():\n","\n","\n"," if copy_lakehouse_data:\n"," df_lakehouses = (labs.list_lakehouses(source_ws))\n"," lh_name= row['Lakehouse Name']\n"," if lh_name.find('temp')==-1:\n"," # Gathers the list of recovers tables and source paths to be copied into the lakehouse associated with this notebook \n"," src_path = f'abfss://{source_ws}@onelake.dfs.fabric.microsoft.com/{lh_name}.Lakehouse'\n","\n"," table_list = get_lh_object_list(src_path)\n"," print(f'Attempting to copy table data for lakehouse {lh_name} from workspace {source_ws} to {target_ws}...')\n"," display(table_list)\n","\n"," #print('Copy Lakehouse Delta tables...')\n"," res = copy_lh_objects(table_list[table_list['type']=='table'],source_ws,target_ws,\n"," lh_name,lh_name,False,False)\n"," display(res)\n"," # Copy files\n"," print(f'Attempting to copy file data for lakehouse {lh_name} from workspace {source_ws} to {target_ws}...')\n","\n"," #print('Copy Lakehouse files...')\n"," res = copy_lh_objects(table_list[table_list['type']=='file'],source_ws,target_ws,\n"," lh_name,lh_name,False,False)\n"," display(res)\n"," print('Done.')\n","\n"," else:\n"," # fetch ID of source lakehouse based on name and workspace\n"," source_lh_id = fabric.resolve_item_id(\n"," item_name=row['Lakehouse Name'], type=\"Lakehouse\", workspace=source_ws\n"," )\n"," #target_lh_id = notebookutils.lakehouse.getWithProperties(name=current_lakehouse['default_lakehouse_name'], workspaceId=new_workspace_id)['id']\n","\n"," SOURCE_URI = f\"abfss://{source_ws_id}@onelake.dfs.fabric.microsoft.com/{source_lh_id}/Tables\"\n"," DEST_URI = f\"abfss://{target_ws_id}@onelake.dfs.fabric.microsoft.com/{row['Lakehouse ID']}/Tables\"\n","\n"," if PATTERN_MATCH is None or len(PATTERN_MATCH) == 0:\n"," raise TypeError(\"Argument 'PATTERN_MATCH' should be a valid list of patterns or [\"*\"] to match everything\")\n","\n"," # Collect created shortcuts\n"," result = []\n","\n"," # If either URI's are invalid, just return\n"," if not is_valid_onelake_uri(SOURCE_URI) or not is_valid_onelake_uri(DEST_URI):\n"," print(\n"," \"invalid URI's provided. URI's should be in the form: abfss://@onelake.dfs.fabric.microsoft.com//\"\n"," )\n"," else:\n"," # Remove any trailing '/' from uri's\n"," source_uri_addr = SOURCE_URI.rstrip(\"/\")\n"," dest_uri_addr = DEST_URI.rstrip(\"/\")\n","\n"," dest_workspace_id, dest_item_id, dest_path = extract_onelake_https_uri_components(\n"," dest_uri_addr\n"," )\n","\n"," # If we are not shortcutting to a managed table folder or\n"," # the source uri is a delta table, just shortcut it 1-1.\n"," if not dest_path.startswith(\"Tables\") or is_delta_table(source_uri_addr):\n"," shortcut = create_onelake_shorcut(source_uri_addr, dest_uri_addr)\n"," if shortcut is not None:\n"," result.append(shortcut)\n"," else:\n"," # If source is not a delta table, and destination is managed table folder:\n"," # Iterate over source folders and create table shortcuts @ destination\n"," for delta_table_uri in get_matching_delta_tables_uris(\n"," source_uri_addr, PATTERN_MATCH\n"," ):\n"," shortcut = create_onelake_shorcut(delta_table_uri, dest_uri_addr)\n"," if shortcut is not None:\n"," result.append(shortcut)\n"," print(result)\n"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"},"collapsed":false},"id":"8bec3fbc-4a75-4ba3-86d5-0620ec504a8f"},{"cell_type":"markdown","source":["##### Copy warehouse data via parameterised pipeline\n","\n","Loop through all warehouses and copy the data"],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"f198b816-77e9-4f04-9139-d78237bedc72"},{"cell_type":"code","source":["p_logging_verbose = True\n","df_warehouses = (labs.list_warehouses(target_ws))\n","display(df_warehouses)\n","for index, row in df_warehouses.iterrows():\n"," source_wh_id = labs.resolve_warehouse_id(row['Warehouse Name'],source_ws_id)\n"," target_wh_id = labs.resolve_warehouse_id(row['Warehouse Name'],target_ws_id)\n"," \n"," src_path = f'abfss://'+source_ws_id+'@onelake.dfs.fabric.microsoft.com/'+source_wh_id\n"," tgt_path = f'abfss://'+target_ws_id+'@onelake.dfs.fabric.microsoft.com/'+target_wh_id\n","\n"," # extract the list of schemas per data \n"," schema_list = get_lh_object_list(src_path,['Tables'])\n"," # extract a list of warehouse objects per schema and store in a list\n"," table_list = get_wh_object_list(schema_list['name'],src_path)\n"," \n"," # create a temporary staging lakehouse per warehouse to create shortcuts into, \n"," # which point back to original warehouse data currently in the DR storage account\n"," lhname = 'temp_rlh_' + source_ws+'_'+row['Warehouse Name']\n"," # check if it exists before attempting create\n"," if p_logging_verbose:\n"," print('Checking whether the temporary lakehouse \"'+ lhname +'\" exists in workspace '+target_ws+'...')\n"," temp_lh_id = getItemId(target_ws_id,lhname,'Lakehouse')\n"," if temp_lh_id == 'NotExists':\n"," lhname = lhname[:256] # lakehouse name should not exceed 256 characters\n"," payload = payload = '{\"displayName\": \"' + lhname + '\",' \\\n"," + '\"description\": \"Interim staging lakehouse for primary warehouse recovery: ' \\\n"," + source_ws+'_'+row['Warehouse Name'] + 'into workspace '+ target_ws + '(' + target_ws +')\"}'\n"," try:\n"," lhurl = \"v1/workspaces/\" + target_ws_id + \"/lakehouses\"\n"," lhresponse = client.post(lhurl,json= json.loads(payload))\n"," temp_lh_id = lhresponse.json()['id']\n"," if p_logging_verbose:\n"," print('Temporary lakehouse \"'+ lhname +'\" created with Id ' + temp_lh_id + ': ' + str(lhresponse.status_code) + ' ' + str(lhresponse.text))\n"," except Exception as error:\n"," print(error.errorCode)\n"," else:\n"," if p_logging_verbose:\n"," print('Temporary lakehouse '+lhname+' (' + temp_lh_id + ') already exists.')\n"," \n"," time.sleep(60) # waiting for temporary lakehouse to provision completely \n","\n"," # Create shortcuts for every table in the format of schema_table under the tables folder\n"," for index,itable in table_list.iterrows():\n"," shortcutExists=False\n"," # Check if shortcut exists\n"," try:\n"," url = \"v1/workspaces/\" + target_ws_id + \"/items/\" + temp_lh_id + \"/shortcuts/Tables/\"+itable['schema']+'_'+itable['name']\n"," tlhresponse = client.get(url)\n"," shortcutExists = True\n"," if p_logging_verbose:\n"," print('Shortcut '+itable['schema']+'_'+itable['name'] +' already exists')\n"," except Exception as error:\n"," shortcutExists = False \n","\n"," if not shortcutExists: \n"," # Create shortcuts - one per table per schema\n"," url = \"v1/workspaces/\" + target_ws_id + \"/items/\" + temp_lh_id + \"/shortcuts\"\n"," scpayload = '{' \\\n"," '\"path\": \"Tables/\",' \\\n"," '\"name\": \"'+itable['schema']+'_'+itable['name']+'\",' \\\n"," '\"target\": {' \\\n"," '\"oneLake\": {' \\\n"," '\"workspaceId\": \"' + source_ws_id + '\",' \\\n"," '\"itemId\": \"'+ source_wh_id +'\",' \\\n"," '\"path\": \"/Tables/' + itable['schema']+'/'+itable['name'] + '\"' \\\n"," '}}}' \n"," try:\n"," #print(scpayload) \n"," shctresponse = client.post(url,json= json.loads(scpayload))\n"," if p_logging_verbose:\n"," print('Shortcut '+itable['schema']+'_'+itable['name'] + ' created.' )\n","\n"," except Exception as error:\n"," print('Error creating shortcut '+itable['schema']+'_'+itable['name']+' due to '+str(error) + ':' + shctresponse.text)\n"," \n"," recovery_pipeline_prefix= 'plRecover_WH' \n"," # recovery pipeline name should not exceed 256 characters\n"," recovery_pipeline = recovery_pipeline_prefix+'_'+source_ws + '_'+row['Warehouse Name'][:256]\n"," if p_logging_verbose:\n"," print('Attempting to deploy a copy pipeline in the target workspace to load the target warehouse tables from the shortcuts created above... ')\n"," # Create the pipeline in the target workspace that loads the target warehouse from shortcuts created above \n"," plid = getItemId( target_ws_id,recovery_pipeline,'DataPipeline')\n"," #print(plid)\n"," if plid == 'NotExists':\n"," plid = createDWrecoverypl(target_ws_id,recovery_pipeline_prefix+'_'+source_ws + '_'+row['Warehouse Name'])\n"," if p_logging_verbose:\n"," print('Recovery pipeline ' + recovery_pipeline + ' created with Id '+plid)\n"," else:\n"," if p_logging_verbose:\n"," print('Datawarehouse recovery pipeline \"' + recovery_pipeline + '\" ('+plid+') already exist in workspace \"'+target_ws + '\" ('+target_ws_id+')') \n"," print('\\n')\n","\n"," tablesToCopyParam = table_list[['schema','name']].to_json( orient='records')\n"," # ensure the temporary lakehouse exists\n","\n"," # obtain the connection string for the lakehouse to pass to the copy pipeline\n"," whurl = \"v1/workspaces/\" + target_ws_id + \"/lakehouses/\" + temp_lh_id\n"," whresponse = client.get(whurl)\n"," lhconnStr = whresponse.json()['properties']['sqlEndpointProperties']['connectionString']\n","\n"," # get the SQLEndpoint ID of the lakehouse to pass to the copy pipeline\n"," items = fabric.list_items(workspace=target_ws_id)\n"," print(items)\n"," temp_lh_sqle_id = items[(items['Type'] == 'SQLEndpoint') & (items['Display Name']==lhname)]['Id'].values[0]\n","\n","\n"," # obtain the connection string for the warehouse to pass to the copy pipeline \n"," whurl = \"v1/workspaces/\" + target_ws_id + \"/warehouses/\" + target_wh_id\n"," whresponse = client.get(whurl)\n"," whconnStr = whresponse.json()['properties']['connectionInfo']\n","\n"," # obtain the pipeline id created to recover this warehouse\n"," plid = getItemId( target_ws_id,recovery_pipeline,'DataPipeline')\n"," if plid == 'NotExists':\n"," print('Error: Could not execute pipeline '+recovery_pipeline+ ' as the ID could not be obtained ')\n"," else:\n"," # pipeline url including pipeline Id unique to each warehouse\n"," plurl = 'v1/workspaces/'+target_ws_id+'/items/'+plid+'/jobs/instances?jobType=Pipeline'\n"," #print(plurl)\n","\n"," payload_data = '{' \\\n"," '\"executionData\": {' \\\n"," '\"parameters\": {' \\\n"," '\"lakehouseId\": \"' + temp_lh_sqle_id + '\",' \\\n"," '\"tablesToCopy\": ' + tablesToCopyParam + ',' \\\n"," '\"workspaceId\": \"' + target_ws_id +'\",' \\\n"," '\"warehouseId\": \"' + target_wh_id + '\",' \\\n"," '\"lakehouseConnStr\": \"' + lhconnStr + '\",' \\\n"," '\"warehouseConnStr\": \"' + whconnStr + '\"' \\\n"," '}}}'\n"," #print(payload_data)\n"," plresponse = client.post(plurl, json=json.loads(payload_data))\n"," if p_logging_verbose:\n"," print(str(plresponse.status_code)) \n","print('Done')\n"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"},"collapsed":false},"id":"57dafef7-17a2-475f-9e62-eecc6660440c"},{"cell_type":"markdown","source":["###### Update direct lake model lakehouse connection\n","\n","https://semantic-link-labs.readthedocs.io/en/stable/sempy_labs.directlake.html#sempy_labs.directlake.update_direct_lake_model_lakehouse_connection\n"," "],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"cc97be77-116e-4cde-bdc6-2971ab98a083"},{"cell_type":"code","source":["\n","df_datasets = fabric.list_datasets(target_ws)\n","\n","# Iterate over each dataset in the dataframe\n","for index, row in df_datasets.iterrows():\n"," # Check if the dataset is not the default semantic model\n"," if not labs.is_default_semantic_model(row['Dataset Name'], fabric.resolve_workspace_id(target_ws)):\n"," print('Updating semantic model connection ' + row['Dataset Name'] + ' in workspace '+ target_ws)\n"," labs.directlake.update_direct_lake_model_connection(dataset=row['Dataset Name'], \n"," workspace= target_ws,\n"," source=labs.directlake.get_direct_lake_source(row['Dataset Name'], workspace= target_ws)[1], \n"," source_type=labs.directlake.get_direct_lake_source(row['Dataset Name'], workspace= target_ws)[0], \n"," source_workspace=target_ws)\n"," labs.refresh_semantic_model(dataset=row['Dataset Name'], workspace= target_ws)\n","\n"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"9deccda6-5c3d-4b88-8ed8-68855ca0949a"},{"cell_type":"markdown","source":["##### Rebind reports in new branch workspace\n","\n","https://semantic-link-labs.readthedocs.io/en/latest/sempy_labs.report.html#sempy_labs.report.report_rebind"],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"36783f3b-4904-4d74-842d-dbd026a3184a"},{"cell_type":"code","source":["df_reports = fabric.list_reports(workspace=target_ws)\n","for index, row in df_reports.iterrows():\n"," #print(row['Name'] + '-' + row['Dataset Id'])\n"," df_datasets = fabric.list_datasets(workspace=target_ws)\n"," dataset_name = df_datasets[df_datasets['Dataset ID'] == row['Dataset Id']]['Dataset Name'].values[0]\n"," print(f'Rebinding report to {dataset_name} in {target_ws}')\n"," labs.report.report_rebind(report=row['Name'],dataset=dataset_name, report_workspace=target_ws, dataset_workspace=target_ws)\n"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"},"collapsed":false},"id":"06268ede-b795-493e-9a8d-772654ce7e20"},{"cell_type":"markdown","source":["##### Update data pipeline source & sink connections\n","\n","Support changes lakehouses, warehouses, notebooks and connections from source to target.
\n","Connections changes should be expressed as an array of tuples [{from_1:to_1},{from_N:to_N}]"],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"4ae65012-350c-40c0-a68a-4069c567a85f"},{"cell_type":"code","source":["from typing import Optional\n","from sempy_labs._helper_functions import (\n"," resolve_workspace_name_and_id,\n"," lro,\n"," _decode_b64,\n",")\n","import sempy_labs._icons as icons\n","\n","import base64\n","from typing import Optional, Tuple, List\n","from uuid import UUID\n","\n","\n","def update_data_pipeline_definition(\n"," name: str, pipeline_content: dict, workspace: Optional[str] = None\n","):\n"," \"\"\"\n"," Updates an existing data pipeline with a new definition.\n","\n"," Parameters\n"," ----------\n"," name : str\n"," The name of the data pipeline.\n"," pipeline_content : dict\n"," The data pipeline content (not in Base64 format).\n"," workspace : str, default=None\n"," The name of the workspace.\n"," Defaults to None which resolves to the workspace of the attached lakehouse\n"," or if no lakehouse attached, resolves to the workspace of the notebook.\n"," \"\"\"\n","\n"," (workspace, workspace_id) = resolve_workspace_name_and_id(workspace)\n"," client = fabric.FabricRestClient()\n"," pipeline_payload = base64.b64encode(json.dumps(pipeline_content).encode('utf-8')).decode('utf-8')\n"," pipeline_id = fabric.resolve_item_id(\n"," item_name=name, type=\"DataPipeline\", workspace=workspace\n"," )\n","\n"," request_body = {\n"," \"definition\": {\n"," \"parts\": [\n"," {\n"," \"path\": \"pipeline-content.json\",\n"," \"payload\": pipeline_payload,\n"," \"payloadType\": \"InlineBase64\"\n"," }\n"," ]\n"," }\n"," }\n","\n","\n"," response = client.post(\n"," f\"v1/workspaces/{workspace_id}/items/{pipeline_id}/updateDefinition\",\n"," json=request_body,\n"," )\n","\n"," lro(client, response, return_status_code=True)\n","\n"," print(\n"," f\"{icons.green_dot} The '{name}' pipeline was updated within the '{workspace}' workspace.\"\n"," )\n","\n","def _is_valid_uuid(\n"," guid: str,\n","):\n"," \"\"\"\n"," Validates if a string is a valid GUID in version 4\n","\n"," Parameters\n"," ----------\n"," guid : str\n"," GUID to be validated.\n","\n"," Returns\n"," -------\n"," bool\n"," Boolean that indicates if the string is a GUID or not.\n"," \"\"\"\n","\n"," try:\n"," UUID(str(guid), version=4)\n"," return True\n"," except ValueError:\n"," return False"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"bdd46d4a-ef58-4f9a-b2e8-a428361a17c1"},{"cell_type":"code","source":["import json\n","from jsonpath_ng import jsonpath, parse\n","from typing import Optional, Tuple, List\n","from uuid import UUID\n","\n","source_ws = ''\n","target_ws = ''\n","\n","\n","# Swaps the connection properties of an activity belonging to the specified item type(s)\n","def swap_pipeline_connection(pl_json: dict, p_source_ws: str,p_target_ws: str, \n"," p_item_type: List =['DataWarehouse','Lakehouse','Notebook'], \n"," p_conn_id_from_to: Optional[List[Tuple[str,str]]]=[]):\n"," \n"," source_ws_id = fabric.resolve_workspace_id(source_ws)\n","\n"," target_ws_id = fabric.resolve_workspace_id(target_ws)\n","\n"," if 'Warehouse' in p_item_type or 'Lakehouse' in p_item_type:\n"," ls_expr = parse('$..linkedService')\n"," for endpoint_match in ls_expr.find(pl_json):\n"," if endpoint_match.value['properties']['type'] == 'DataWarehouse' \\\n"," and endpoint_match.value['properties']['typeProperties']['workspaceId'] == source_ws_id \\\n"," and 'Warehouse' in p_item_type:\n"," # only update the warehouse if it was located in the source workspace i.e. we will update the properties to the target workspace if the warehouse resided in the same workspace as the pipeline\n"," #print(endpoint_match.value)\n"," warehouse_id = endpoint_match.value['properties']['typeProperties']['artifactId']\n"," #print(warehouse_id)\n"," warehouse_endpoint = endpoint_match.value['properties']['typeProperties']['endpoint']\n"," #print(warehouse_endpoint)\n"," \n"," source_wh_name = fabric.resolve_item_name(item_id = warehouse_id,workspace=source_ws_id)\n"," #print(remote_wh_name)\n"," # find the warehouse id of the warehouse with the same name in the target workspace\n"," target_wh_id = fabric.resolve_item_id(item_name = source_wh_name,type='Warehouse',workspace=target_ws_id)\n"," # look up the connection string for the warehouse in the target workspace\n"," whurl = f\"v1/workspaces/{target_ws_id}/warehouses/{target_wh_id}\"\n"," whresponse = client.get(whurl)\n"," lhconnStr = whresponse.json()['properties']['connectionString']\n"," endpoint_match.value['properties']['typeProperties']['artifactId'] = target_wh_id\n"," endpoint_match.value['properties']['typeProperties']['workspaceId'] = target_ws_id\n"," endpoint_match.value['properties']['typeProperties']['endpoint'] = lhconnStr\n"," #print(endpoint_match.value)\n"," ls_expr.update(endpoint_match,endpoint_match.value)\n"," if endpoint_match.value['properties']['type'] == 'Lakehouse' \\\n"," and endpoint_match.value['properties']['typeProperties']['workspaceId'] == source_ws_id \\\n"," and 'Lakehouse' in p_item_type:\n"," #print(endpoint_match.value)\n"," lakehouse_id = endpoint_match.value['properties']['typeProperties']['artifactId']\n"," remote_lh_name = fabric.resolve_item_name(item_id = lakehouse_id,workspace=source_ws_id)\n"," # find the lakehouse id of the lakehouse with the same name in the target workspace\n"," target_lh_id = fabric.resolve_item_id(item_name = remote_lh_name,type='Lakehouse',workspace=target_ws_id)\n"," endpoint_match.value['properties']['typeProperties']['artifactId'] = target_lh_id\n"," endpoint_match.value['properties']['typeProperties']['workspaceId'] = target_ws_id\n"," ls_expr.update(endpoint_match,endpoint_match.value)\n"," # print(endpoint_match.value)\n","\n","\n"," if 'Notebook' in p_item_type: \n"," ls_expr = parse('$..activities')\n","\n"," for endpoint_match in ls_expr.find(pl_json):\n"," for activity in endpoint_match.value:\n"," #print(activity['type'])\n"," if activity['type']=='TridentNotebook' and 'Notebook' in p_item_type: #only update if the notebook was in the same workspace as the pipeline\n"," print('change from '+activity['typeProperties']['workspaceId'])\n"," source_nb_id = activity['typeProperties']['notebookId']\n"," source_nb_name = fabric.resolve_item_name(item_id = source_nb_id,workspace=source_ws_id)\n"," target_nb_id = fabric.resolve_item_id(item_name = source_nb_name,type='Notebook',workspace=target_ws_id)\n"," activity['typeProperties']['notebookId']=target_nb_id\n"," activity['typeProperties']['workspaceId']=target_ws_id\n"," print('to notebook '+ target_nb_id)\n"," #ls_expr.update(endpoint_match,endpoint_match.value)\n","\n"," if p_conn_from_to:\n"," for ti_conn_from_to in p_conn_from_to:\n"," if not _is_valid_uuid(ti_conn_from_to[0]):\n"," print('Connection from is string '+ str(ti_conn_from_to[0]))\n"," dfC_filt = df_conns[df_conns[\"Connection Name\"] == ti_conn_from_to[0]] \n"," connId_from = dfC_filt['Connection Id'].iloc[0] \n"," else:\n"," connId_from = ti_conn_from_to[0]\n","\n"," if not _is_valid_uuid(ti_conn_from_to[1]):\n"," print('Connection from is string '+ str(ti_conn_from_to[1]))\n"," dfC_filt = df_conns[df_conns[\"Connection Name\"] == ti_conn_from_to[1]] \n"," connId_to = dfC_filt['Connection Id'].iloc[0] \n"," else:\n"," connId_to = ti_conn_from_to[1]\n","\n"," ls_expr = parse('$..externalReferences')\n"," for externalRef in ls_expr.find(pl_json):\n"," if externalRef.value['connection']==connId_from:\n"," print('Changing connection from '+str(connId_from))\n"," externalRef.value['connection']=connId_to\n"," ls_expr.update(externalRef,externalRef.value)\n"," print('to '+str(connId_to))\n","\n"," return pl_json\n","\n","\n","\n","# loading a dataframe of connections to perform an ID lookup if required \n","df_conns = labs.list_connections()\n","\n","df_pipeline = labs.list_data_pipelines(target_ws)\n","for index, row in df_pipeline.iterrows():\n"," #print(labs.get_data_pipeline_definition(row['Data Pipeline Name'],target_ws))\n"," if row['Data Pipeline Name']=='plRecover_WH6_Prod2_Warehouse2_fixed':\n"," pipeline_json = json.loads(labs.get_data_pipeline_definition(row['Data Pipeline Name'],source_ws))\n","\n"," p_new_json = swap_pipeline_connection(pipeline_json, source_ws,target_ws,\n"," ['DataWarehouse','Lakehouse','Notebook'],\n"," [p_connections_from_to]) \n"," #print(json.dumps(pipeline_json, indent=4))\n"," \n"," update_data_pipeline_definition(name=row['Data Pipeline Name'],pipeline_content=pipeline_json, workspace=target_ws)\n"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"079958e8-2880-484a-a994-41caf47e747e"},{"cell_type":"markdown","source":["##### Commit changes made above to Git"],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"44174276-b983-4e80-9451-0afb9589cf1f"},{"cell_type":"code","source":["labs.commit_to_git(comment='Initial', workspace=target_ws)"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"9a5c3d84-f71d-4348-b419-c4953ac9e1d0"}],"metadata":{"kernel_info":{"name":"synapse_pyspark"},"kernelspec":{"name":"synapse_pyspark","language":"Python","display_name":"Synapse PySpark"},"language_info":{"name":"python"},"microsoft":{"language":"python","language_group":"synapse_pyspark","ms_spell_check":{"ms_spell_check_language":"en"}},"widgets":{},"nteract":{"version":"nteract-front-end@1.0.0"},"synapse_widget":{"version":"0.1","state":{}},"spark_compute":{"compute_id":"/trident/default","session_options":{"conf":{"spark.synapse.nbs.session.timeout":"1200000"}}},"dependencies":{"lakehouse":{}}},"nbformat":4,"nbformat_minor":5} diff --git a/governance/CICD/branch-out-to-new-workspace/README.md b/governance/CICD/branch-out-to-new-workspace/README.md new file mode 100644 index 0000000..ad25412 --- /dev/null +++ b/governance/CICD/branch-out-to-new-workspace/README.md @@ -0,0 +1 @@ +For instructions please refer to the associated PDF in this folder.