diff --git a/data-warehouse/notebooks/create-llist-delete-warehouse.ipynb b/data-warehouse/notebooks/create-llist-delete-warehouse.ipynb index 8f530ab..5d17989 100644 --- a/data-warehouse/notebooks/create-llist-delete-warehouse.ipynb +++ b/data-warehouse/notebooks/create-llist-delete-warehouse.ipynb @@ -1 +1,224 @@ -{"cells":[{"cell_type":"code","source":["# details from Article : https://learn.microsoft.com/en-us/fabric/data-warehouse/collation\n","# default collation is Latin1_General_100_BIN2_UTF8\n","# new collation is Latin1_General_100_CI_AS_KS_WS_SC_UTF8\n","\n","#REST API : https://learn.microsoft.com/en-us/rest/api/fabric/warehouse/items/create-warehouse?tabs=HTTP\n","\n","#sempy version 0.4.0 or higher\n","!pip install semantic-link --q \n","import json\n","import sempy.fabric as fabric\n","from sempy.fabric.exceptions import FabricHTTPException, WorkspaceNotFoundException\n","\n","workspace_id=spark.conf.get(\"trident.workspace.id\")\n","\n","#Instantiate the client\n","client = fabric.FabricRestClient()\n","\n","uri = f\"https://api.fabric.microsoft.com/v1/workspaces/{workspace_id}/items\"\n","payload = { \n"," \"type\": \"Warehouse\", \n"," \"displayName\": \"marktest\", \n"," \"description\": \"New warehouse with case-insensitive collation\", \n"," \"creationPayload\": { \n"," \"defaultCollation\": \"Latin1_General_100_CI_AS_KS_WS_SC_UTF8\" \n"," } \n","}\n","\n","# Call the REST API\n","response = client.post(uri,json= payload)\n","display(response)\n","\n","#data = json.loads(response.text)\n","#display(data)"],"outputs":[{"output_type":"display_data","data":{"application/vnd.livy.statement-meta+json":{"spark_pool":null,"statement_id":74,"statement_ids":[74],"state":"finished","livy_statement_state":"available","session_id":"65aa4f49-978d-45fd-b906-01addd1f27e9","normalized_state":"finished","queued_time":"2024-11-30T12:09:02.1534597Z","session_start_time":null,"execution_start_time":"2024-11-30T12:09:06.9337526Z","execution_finish_time":"2024-11-30T12:09:11.7107053Z","parent_msg_id":"0f55a180-bb20-4af1-ba70-00fa37c019a0"},"text/plain":"StatementMeta(, 65aa4f49-978d-45fd-b906-01addd1f27e9, 74, Finished, Available, Finished)"},"metadata":{}},{"output_type":"display_data","data":{"text/plain":""},"metadata":{}}],"execution_count":72,"metadata":{"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"64c0e7fb-eff6-4544-aa5f-2b80354243ae"},{"cell_type":"code","source":["# RESPI API : https://learn.microsoft.com/en-us/rest/api/fabric/warehouse/items/list-warehouses?tabs=HTTP\n","\n","import json\n","import sempy.fabric as fabric\n","from sempy.fabric.exceptions import FabricHTTPException, WorkspaceNotFoundException\n","import time\n","target_displayname = 'marktest'\n","\n","workspace_id=spark.conf.get(\"trident.workspace.id\")\n","\n","#Instantiate the client\n","client = fabric.FabricRestClient()\n","\n","statusuri = f\"https://api.fabric.microsoft.com/v1/workspaces/{workspace_id}/warehouses\"\n","matching_id = None\n","\n","while(matching_id is None):\n"," statusresponsedata = client.get(statusuri).json()\n"," datar = statusresponsedata['value']\n"," for item in datar:\n"," whName = item['displayName']\n"," if whName == target_displayname:\n"," matching_id = item['id']\n"," break\n"," \n"," display(\"Waiting....\")\n"," time.sleep(1)\n","\n","display(f\"Warehouse id is {matching_id}\")\n","display(\"Warehouse details:\")\n","print(item)"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"},"collapsed":false},"id":"41e98e7c-42fc-4521-b7e5-6db5a1656143"},{"cell_type":"code","source":["# RESPI API : https://learn.microsoft.com/en-us/rest/api/fabric/warehouse/items/get-warehouse?tabs=HTTP\n","\n","import json\n","import sempy.fabric as fabric\n","from sempy.fabric.exceptions import FabricHTTPException, WorkspaceNotFoundException\n","\n","workspace_id=spark.conf.get(\"trident.workspace.id\")\n","\n","#Instantiate the client\n","client = fabric.FabricRestClient()\n","\n","#matchind_id = 'bd3bb97e-8255-4b33-8ac2-8f63ec53fd23' \n","\n","statusuri = f\"https://api.fabric.microsoft.com/v1/workspaces/{workspace_id}/warehouses/{matching_id}\"\n","\n","statusresponsedata = client.get(statusuri).json()\n","display(statusresponsedata)\n","\n"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"49901dcb-bdd1-4036-920c-42bccf023638"},{"cell_type":"code","source":["# REST API: https://learn.microsoft.com/en-us/rest/api/fabric/warehouse/items/delete-warehouse?tabs=HTTP\n","\n","import json\n","import sempy.fabric as fabric\n","from sempy.fabric.exceptions import FabricHTTPException, WorkspaceNotFoundException\n","\n","workspace_id=spark.conf.get(\"trident.workspace.id\")\n","#Instantiate the client\n","client = fabric.FabricRestClient()\n","\n","uri = f\"https://api.fabric.microsoft.com/v1/workspaces/{workspace_id}/warehouses/{matching_id}\"\n","\n","\n","# Call the REST API\n","response = client.delete(uri)\n","display(response)\n"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"0c9a011d-e4c9-4365-8f93-9cc3927d768e"}],"metadata":{"kernel_info":{"name":"synapse_pyspark"},"kernelspec":{"name":"synapse_pyspark","language":"Python","display_name":"Synapse PySpark"},"language_info":{"name":"python"},"microsoft":{"language":"python","language_group":"synapse_pyspark","ms_spell_check":{"ms_spell_check_language":"en"}},"nteract":{"version":"nteract-front-end@1.0.0"},"spark_compute":{"compute_id":"/trident/default","session_options":{"conf":{"spark.synapse.nbs.session.timeout":"1200000"}}},"synapse_widget":{"version":"0.1","state":{}},"dependencies":{}},"nbformat":4,"nbformat_minor":5} \ No newline at end of file +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "64c0e7fb-eff6-4544-aa5f-2b80354243ae", + "metadata": { + "microsoft": { + "language": "python", + "language_group": "synapse_pyspark" + } + }, + "outputs": [], + "source": [ + "# details from Article : https://learn.microsoft.com/en-us/fabric/data-warehouse/collation\n", + "# default collation is Latin1_General_100_BIN2_UTF8\n", + "# new collation is Latin1_General_100_CI_AS_KS_WS_SC_UTF8\n", + "\n", + "#REST API : https://learn.microsoft.com/en-us/rest/api/fabric/warehouse/items/create-warehouse?tabs=HTTP\n", + "\n", + "#sempy version 0.4.0 or higher\n", + "!pip install semantic-link --q \n", + "import json\n", + "import sempy.fabric as fabric\n", + "from sempy.fabric.exceptions import FabricHTTPException, WorkspaceNotFoundException\n", + "\n", + "workspace_id=spark.conf.get(\"trident.workspace.id\")\n", + "\n", + "#Instantiate the client\n", + "client = fabric.FabricRestClient()\n", + "\n", + "uri = f\"https://api.fabric.microsoft.com/v1/workspaces/{workspace_id}/items\"\n", + "payload = { \n", + " \"type\": \"Warehouse\", \n", + " \"displayName\": \"marktest\", \n", + " \"description\": \"New warehouse with case-insensitive collation\", \n", + " \"creationPayload\": { \n", + " \"defaultCollation\": \"Latin1_General_100_CI_AS_KS_WS_SC_UTF8\" \n", + " } \n", + "}\n", + "\n", + "# Call the REST API\n", + "response = client.post(uri,json= payload)\n", + "display(response)\n", + "\n", + "#data = json.loads(response.text)\n", + "#display(data)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "41e98e7c-42fc-4521-b7e5-6db5a1656143", + "metadata": { + "collapsed": false, + "jupyter": { + "outputs_hidden": false, + "source_hidden": false + }, + "microsoft": { + "language": "python", + "language_group": "synapse_pyspark" + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "# RESPI API : https://learn.microsoft.com/en-us/rest/api/fabric/warehouse/items/list-warehouses?tabs=HTTP\n", + "\n", + "import json\n", + "import sempy.fabric as fabric\n", + "from sempy.fabric.exceptions import FabricHTTPException, WorkspaceNotFoundException\n", + "import time\n", + "target_displayname = 'marktest'\n", + "\n", + "workspace_id=spark.conf.get(\"trident.workspace.id\")\n", + "\n", + "#Instantiate the client\n", + "client = fabric.FabricRestClient()\n", + "\n", + "statusuri = f\"https://api.fabric.microsoft.com/v1/workspaces/{workspace_id}/warehouses\"\n", + "matching_id = None\n", + "\n", + "while(matching_id is None):\n", + " statusresponsedata = client.get(statusuri).json()\n", + " datar = statusresponsedata['value']\n", + " for item in datar:\n", + " whName = item['displayName']\n", + " if whName == target_displayname:\n", + " matching_id = item['id']\n", + " break\n", + " \n", + " display(\"Waiting....\")\n", + " time.sleep(1)\n", + "\n", + "display(f\"Warehouse id is {matching_id}\")\n", + "display(\"Warehouse details:\")\n", + "print(item)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "49901dcb-bdd1-4036-920c-42bccf023638", + "metadata": { + "jupyter": { + "outputs_hidden": false, + "source_hidden": false + }, + "microsoft": { + "language": "python", + "language_group": "synapse_pyspark" + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "# RESPI API : https://learn.microsoft.com/en-us/rest/api/fabric/warehouse/items/get-warehouse?tabs=HTTP\n", + "\n", + "import json\n", + "import sempy.fabric as fabric\n", + "from sempy.fabric.exceptions import FabricHTTPException, WorkspaceNotFoundException\n", + "\n", + "workspace_id=spark.conf.get(\"trident.workspace.id\")\n", + "\n", + "#Instantiate the client\n", + "client = fabric.FabricRestClient()\n", + "\n", + "#matchind_id = 'bd3bb97e-8255-4b33-8ac2-8f63ec53fd23' \n", + "\n", + "statusuri = f\"https://api.fabric.microsoft.com/v1/workspaces/{workspace_id}/warehouses/{matching_id}\"\n", + "\n", + "statusresponsedata = client.get(statusuri).json()\n", + "display(statusresponsedata)\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0c9a011d-e4c9-4365-8f93-9cc3927d768e", + "metadata": { + "jupyter": { + "outputs_hidden": false, + "source_hidden": false + }, + "microsoft": { + "language": "python", + "language_group": "synapse_pyspark" + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "# REST API: https://learn.microsoft.com/en-us/rest/api/fabric/warehouse/items/delete-warehouse?tabs=HTTP\n", + "\n", + "import json\n", + "import sempy.fabric as fabric\n", + "from sempy.fabric.exceptions import FabricHTTPException, WorkspaceNotFoundException\n", + "\n", + "workspace_id=spark.conf.get(\"trident.workspace.id\")\n", + "#Instantiate the client\n", + "client = fabric.FabricRestClient()\n", + "\n", + "uri = f\"https://api.fabric.microsoft.com/v1/workspaces/{workspace_id}/warehouses/{matching_id}\"\n", + "\n", + "\n", + "# Call the REST API\n", + "response = client.delete(uri)\n", + "display(response)\n" + ] + } + ], + "metadata": { + "dependencies": {}, + "kernel_info": { + "name": "synapse_pyspark" + }, + "kernelspec": { + "display_name": "Synapse PySpark", + "language": "Python", + "name": "synapse_pyspark" + }, + "language_info": { + "name": "python" + }, + "microsoft": { + "language": "python", + "language_group": "synapse_pyspark", + "ms_spell_check": { + "ms_spell_check_language": "en" + } + }, + "nteract": { + "version": "nteract-front-end@1.0.0" + }, + "spark_compute": { + "compute_id": "/trident/default", + "session_options": { + "conf": { + "spark.synapse.nbs.session.timeout": "1200000" + } + } + }, + "synapse_widget": { + "state": {}, + "version": "0.1" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/open-mirroring/GenericMirroring/DatabaseConfig.cs b/open-mirroring/GenericMirroring/DatabaseConfig.cs new file mode 100644 index 0000000..dbe95d9 --- /dev/null +++ b/open-mirroring/GenericMirroring/DatabaseConfig.cs @@ -0,0 +1,119 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net.Http.Headers; +using System.Text; +using System.Threading.Tasks; + +namespace SQLMirroring +{ + public class DatabaseConfig + { + public string ConnectionString { get; set; } + public string Type { get; set; } + + public string ChangeTrackingEnabled { get; set; } + + public int syncVersion { get; set; } + public List Tables { get; set; } + + public string LocalLocationforTables { get; set; } + public string DatabaseName { get; set; } + public string ChangeTrackingSQL { get; set; } + public string ChangeTrackingTable { get; set; } + + public string Highwatermark { get; set; } + public string HighwatermarkSQL { get; set; } + + public string ChangeIncrementalSQL { get; set; } + + public string FullDataExtractQuery { get; set; } + } + + public class TableConfig + { + public string TableName { get; set; } + public string SchemaName { get; set; } + public string Status { get; set; } + + public DateTime LastUpdate { get; set; } + public int SecondsBetweenChecks { get; set; } + + public string KeyColumn { get; set; } + public string OtherColumns { get; set; } + public string AdditionalColumns { get; set; } + + public string DeltaVersion { get; set; } + + public string SoftDelete { get; set; } + + } + + public class Root + { + public List SQLChangeTrackingConfig { get; set; } + + public ExcelConfig ExcelMirroringConfig { get; set; } + + public List uploadDetails { get; set; } + + public string LocationLogging { get; set; } + + public AccessConfig AccessMirroringConfig { get; set; } + + public CSVConfig CSVMirroringConfig { get; set; } + + public Gen2Config Gen2MirroringConfig { get; set; } + + public SharepointConfig SharepointMirroringConfig { get; set; } + + public Root() + { + uploadDetails = new List(); + } + } + + public class UploadDetails + { + public string SPN_Application_ID { get; set; } + public string SPN_Secret { get; set; } + public string SPN_Tenant { get; set; } + public string LandingZone { get; set; } + + public string PathtoAZCopy { get; set; } + } + + public class ExcelConfig + { + public string folderToWatch { get; set; } + public string outputFolder { get; set; } + + } + + public class CSVConfig + { + public string folderToWatch { get; set; } + public string outputFolder { get; set; } + + } + + public class AccessConfig + { + public string folderToWatch { get; set; } + public string outputFolder { get; set; } + + public Boolean IncludeFolders { get; set; } + + } + + public class Gen2Config + { + public string ConnectionString { get; set; } + } + + public class SharepointConfig + { + + } + +} diff --git a/open-mirroring/GenericMirroring/GenericMirroring.csproj b/open-mirroring/GenericMirroring/GenericMirroring.csproj new file mode 100644 index 0000000..1d7bcd9 --- /dev/null +++ b/open-mirroring/GenericMirroring/GenericMirroring.csproj @@ -0,0 +1,30 @@ + + + + Exe + net8.0 + enable + enable + + + + + + + + + + + + + Always + + + Always + + + Always + + + + diff --git a/open-mirroring/GenericMirroring/GenericMirroring.sln b/open-mirroring/GenericMirroring/GenericMirroring.sln new file mode 100644 index 0000000..e6a9a55 --- /dev/null +++ b/open-mirroring/GenericMirroring/GenericMirroring.sln @@ -0,0 +1,25 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio Version 17 +VisualStudioVersion = 17.11.35431.28 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "GenericMirroring", "GenericMirroring.csproj", "{54AE3110-2E1B-4D58-AA2B-9FFA4DE9E559}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {54AE3110-2E1B-4D58-AA2B-9FFA4DE9E559}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {54AE3110-2E1B-4D58-AA2B-9FFA4DE9E559}.Debug|Any CPU.Build.0 = Debug|Any CPU + {54AE3110-2E1B-4D58-AA2B-9FFA4DE9E559}.Release|Any CPU.ActiveCfg = Release|Any CPU + {54AE3110-2E1B-4D58-AA2B-9FFA4DE9E559}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {B6647F31-0888-4C6F-9053-5826D62C402C} + EndGlobalSection +EndGlobal diff --git a/open-mirroring/GenericMirroring/Logging.cs b/open-mirroring/GenericMirroring/Logging.cs new file mode 100644 index 0000000..3bfe0a9 --- /dev/null +++ b/open-mirroring/GenericMirroring/Logging.cs @@ -0,0 +1,31 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace GenericMirroring +{ + + public static class Logging + { + + public static string locallogging = string.Empty; + + + public static void Log(string message, string logLevel = "INFO") + { + string logMessage = $"{DateTime.Now:yyyy-MM-dd HH:mm:ss} [{logLevel}] {message}"; + Console.WriteLine(logMessage); + try + { + File.AppendAllText(locallogging, logMessage + Environment.NewLine); + } + catch (Exception ex) + { + Console.WriteLine($"Failed to write log: {ex.Message}"); + } + } + + } +} diff --git a/open-mirroring/GenericMirroring/Parquet.cs b/open-mirroring/GenericMirroring/Parquet.cs new file mode 100644 index 0000000..596514c --- /dev/null +++ b/open-mirroring/GenericMirroring/Parquet.cs @@ -0,0 +1,121 @@ +using ParquetSharp; +using System.Data; +using System.Runtime.Intrinsics.Arm; +using GenericMirroring; + +namespace SQLMirroring +{ + public static class ParquetDump + { + static public void WriteDataTableToParquet(DataTable table, string filePath) + { + try + { + var columnFields = new Column[table.Columns.Count]; + for (int i = 0; i < table.Columns.Count; i++) + { + var column = table.Columns[i]; + var colType = column.DataType; + columnFields[i] = new Column(colType, column.ColumnName); + // Log($"CCC {column.ColumnName} {column.DataType}"); + } + // Write Parquet file + using (var file = new FileStream(filePath, FileMode.Create, FileAccess.Write)) + { + using (var parquetWriter = new ParquetFileWriter(file, columnFields)) + { + + using (var rowGroupWriter = parquetWriter.AppendRowGroup()) + { + // Write each column + int colcount = 0; + foreach (DataColumn column in table.Columns) + { + var columnData = new object[table.Rows.Count]; + for (int i = 0; i < table.Rows.Count; i++) + { + columnData[i] = table.Rows[i][column]; + } + + try + { + if (column.DataType == System.Type.GetType("System.Int32")) + { + int[]? intArray = Array.ConvertAll(columnData, item => Convert.ToInt32(item ?? 0)); // Convert.ToInt32(item)); + using (var valueWriter = rowGroupWriter.NextColumn().LogicalWriter()) + { + valueWriter.WriteBatch(intArray); + } + } + else if (column.DataType == System.Type.GetType("System.String")) + { + string[] stringArray = Array.ConvertAll(columnData, obj => obj?.ToString() ?? string.Empty); + + // Convert to type-specific array + using (var valueWriter = rowGroupWriter.NextColumn().LogicalWriter()) + { + valueWriter.WriteBatch(stringArray); + } + } + else if (column.DataType == System.Type.GetType("System.DateTime")) + { + DateTime[] stringArray = Array.ConvertAll(columnData, item => Convert.ToDateTime(item)); + + // Convert to type-specific array + using (var valueWriter = rowGroupWriter.NextColumn().LogicalWriter()) + { + valueWriter.WriteBatch(stringArray); + } + } + else if (column.DataType == System.Type.GetType("System.Object")) + { + + string[] stringArray = Array.ConvertAll(columnData, obj => obj?.ToString() ?? string.Empty); + // Convert to type-specific array + using (var valueWriter = rowGroupWriter.NextColumn().LogicalWriter()) + { + valueWriter.WriteBatch(stringArray); + } + } + else + { + string[] stringArray = Array.ConvertAll(columnData, obj => obj?.ToString() ?? string.Empty); + + // Convert to type-specific array + using (var valueWriter = rowGroupWriter.NextColumn().LogicalWriter()) + { + valueWriter.WriteBatch(stringArray); + } + } + colcount++; + } + catch (Exception ex) + { + Logging.Log($"WriteDataTableToParquet:loop" + + $"-{ex.Message}"); + } + } + } + parquetWriter.Close(); + } + } + Thread.Sleep(1000); + } + catch (Exception ex) + { + Logging.Log($"WriteDataTableToParquet-{ex.Message}"); + } + } + + static public void WriteDataTableToParquet(IDataReader rdr, string filePath) + { + DataTable dataTable = new DataTable(); + dataTable.Load(rdr); + + WriteDataTableToParquet(dataTable, filePath); + + } + + + } +} diff --git a/open-mirroring/GenericMirroring/Program.cs b/open-mirroring/GenericMirroring/Program.cs new file mode 100644 index 0000000..bd61b0d --- /dev/null +++ b/open-mirroring/GenericMirroring/Program.cs @@ -0,0 +1,335 @@ +using SQLMirroring; +using System; +using Microsoft.Data.SqlClient; +using static System.Net.Mime.MediaTypeNames; +using System.IO; +using System.Runtime.Intrinsics.Arm; +using System.Data; +using GenericMirroring; +using GenericMirroring.sources; +class Program +{ + private static Root config = null; + static void Main(string[] args) + { + #region Load the Config + // Path to the JSON configuration file + string configFilePath = "mirrorconfig.json"; + // string configpath = System.IO.Path.GetDirectoryName(System.Reflection.Assembly.GetExecutingAssembly().Location); + string configpath = "C:\\source\\samples\\fabric-toolbox\\open-mirroring\\GenericMirroring"; + string wholepath = string.Format("{0}\\{1}", configpath, configFilePath); + string loggingfilename = $"{DateTime.Now:yyyy-MM-dd_HH-mm-ss}_logging.txt"; + + // Read and parse configuration + //DatabaseConfig config = helper.LoadData(wholepath); + config = helper.LoadData(wholepath); + if (config == null) + { + Console.WriteLine("Failed to load configuration."); + return; + } + + Console.WriteLine("Configuration loaded successfully."); + #endregion + + helper.CreateFolders(config.LocationLogging); + Logging.locallogging = string.Format("{0}\\{1}",config.LocationLogging,loggingfilename); + + #region Setup Excel Mirroing Watcher + if (config.ExcelMirroringConfig != null) + { + string folderToWatch = config.ExcelMirroringConfig.folderToWatch; + string outputfolder = config.ExcelMirroringConfig.outputFolder; + + if (folderToWatch.Length > 0) + { + helper.CreateFolders(folderToWatch); + } + + Logging.Log(String.Format("Excel Config : Watching folder: {0}", folderToWatch)); + + FileSystemWatcher watcher = new FileSystemWatcher(); + + // Set the directory to monitor + watcher.Path = folderToWatch; + + // Optionally, filter for specific files or extensions (e.g., "*.txt" for text files) + watcher.Filters.Add("*.xlsx"); + + // Watch for specific changes (created, changed, deleted, renamed) + watcher.NotifyFilter = NotifyFilters.FileName | // File name changes + NotifyFilters.LastWrite | // File modifications + NotifyFilters.Size | + NotifyFilters.CreationTime | + NotifyFilters.Attributes; // File size changes + + watcher.Created += OnCreated; + watcher.Changed += OnCreated; + watcher.Renamed += OnCreated; + + watcher.EnableRaisingEvents = true; + + Logging.Log("Watching..."); + } + else + { + Logging.Log("No Excel config."); + } + #endregion + + #region Setup CSV Mirroing Watcher + if (config.ExcelMirroringConfig != null) + { + string folderToWatch = config.CSVMirroringConfig.folderToWatch; + string outputfolder = config.CSVMirroringConfig.outputFolder; + + if (folderToWatch.Length > 0) + { + helper.CreateFolders(folderToWatch); + } + + Logging.Log(String.Format("CSV Config : Watching folder: {0}", folderToWatch)); + + FileSystemWatcher csvWatcher = new FileSystemWatcher(); + + // Set the directory to monitor + csvWatcher.Path = folderToWatch; + + // Optionally, filter for specific files or extensions (e.g., "*.txt" for text files) + csvWatcher.Filters.Add("*.csv"); + + // Watch for specific changes (created, changed, deleted, renamed) + csvWatcher.NotifyFilter = NotifyFilters.FileName | // File name changes + NotifyFilters.LastWrite | // File modifications + NotifyFilters.Size | + NotifyFilters.CreationTime | + NotifyFilters.Attributes; // File size changes + + csvWatcher.Created += OnCreated; + csvWatcher.Changed += OnCreated; + csvWatcher.Renamed += OnCreated; + + csvWatcher.EnableRaisingEvents = true; + + Logging.Log("Watching..."); + } + else + { + Logging.Log("No Excel config."); + } + #endregion + + #region Setup Access Mirroing Watcher + if (config.AccessMirroringConfig != null) + { + string folderToWatch = config.AccessMirroringConfig.folderToWatch; + string outputfolder = config.AccessMirroringConfig.outputFolder; + + if (folderToWatch.Length > 0) + { + helper.CreateFolders(folderToWatch); + } + + Logging.Log(String.Format("Access Config : Watching folder: {0}", folderToWatch)); + + FileSystemWatcher accessWatcher = new FileSystemWatcher(); + + // Set the directory to monitor + accessWatcher.Path = folderToWatch; + + // Optionally, filter for specific files or extensions (e.g., "*.txt" for text files) + accessWatcher.Filters.Add("*.accdb"); + + // Watch for specific changes (created, changed, deleted, renamed) + accessWatcher.NotifyFilter = NotifyFilters.FileName | // File name changes + NotifyFilters.LastWrite | // File modifications + NotifyFilters.Size | + NotifyFilters.CreationTime | + NotifyFilters.Attributes; // File size changes + + accessWatcher.Created += OnCreated; + accessWatcher.Changed += OnCreated; + accessWatcher.Renamed += OnCreated; + + accessWatcher.EnableRaisingEvents = true; + + Logging.Log("Watching..."); + } + else + { + Logging.Log("No Access config."); + } + #endregion + + // Power loop + + while (true) + { + foreach (DatabaseConfig dbC in config.SQLChangeTrackingConfig) + { + #region SQL Change Tracking + if (dbC != null) + { + if (dbC.ConnectionString != null || dbC.ConnectionString.Length > 0) + { + // only run the SQL code, when there is a connection string. + + string databaseName = dbC.DatabaseName; + string connectionString = helper.UpdateString(dbC.ConnectionString, databaseName); + string ChangeTrackingSQL = helper.UpdateString(dbC.ChangeTrackingSQL, databaseName); + string ChangeTrackingTable = dbC.ChangeTrackingTable; + string LocalLocationforTables = dbC.LocalLocationforTables; + + + if (dbC.ChangeTrackingEnabled == null || dbC.ChangeTrackingEnabled == string.Empty) + { + Logging.Log("Enabled CT at database."); + SQLServer.ExecuteNonQuery(connectionString, ChangeTrackingSQL); + dbC.ChangeTrackingEnabled = "Enabled"; + helper.SaveData(config, wholepath); + } + + foreach (TableConfig table in dbC.Tables) + { + string tableName = string.Format("{0}.{1}", table.SchemaName, table.TableName); + + //Log("Scanning::{0}", tableName); + + if (table.LastUpdate == null) table.LastUpdate = DateTime.Now; + if (table.SecondsBetweenChecks == null || table.SecondsBetweenChecks == 0) table.SecondsBetweenChecks = 15; + + #region Enable CT on table + if (table.Status == null) + { + Logging.Log(String.Format("Starting from scratch {0} ", tableName)); + + string TableCt = helper.UpdateString(ChangeTrackingTable, tableName); + + SQLServer.ExecuteNonQuery(connectionString, TableCt); + + table.Status = "Enabled"; + + helper.SaveData(config, wholepath); + } + #endregion + + #region Check for changes + if (table.Status == "Running") + { + // Get Initial Snapshot + if (table.LastUpdate.AddSeconds(table.SecondsBetweenChecks) < DateTime.Now) + { + + //Logging.Log(String.Format("Checking for updates: {0} ", tableName)); + + string extractQuery = dbC.ChangeIncrementalSQL; + extractQuery = UpdateQuery(dbC, table, tableName, extractQuery); + + string locforTable = string.Format("{0}\\{1}.schema\\{2}\\", LocalLocationforTables, table.SchemaName, table.TableName); + string newfilename = helper.GetFileVersionName(locforTable); + string parquetFilePath = Path.Combine(locforTable, $"{newfilename}.parquet"); + + string justTablepath = string.Format("/{0}.schema/{1}/{2}.parquet", table.SchemaName, table.TableName, newfilename); + string justMetadatapath = string.Format("/{0}.schema/{1}/_metadata.json", table.SchemaName, table.TableName, newfilename); + string removePath = string.Format("{0}.schema/{1}", table.SchemaName, table.TableName, newfilename); + + if (SQLServer.ExecuteRSWritePQ(connectionString, extractQuery, parquetFilePath)) + { + Logging.Log(String.Format("Found upates : {0} ", tableName)); + Upload.CopyChangesToOnelake(config, parquetFilePath, justTablepath); + table.DeltaVersion = newfilename; + dbC.Highwatermark = SQLServer.ExecuteScalar(connectionString, dbC.HighwatermarkSQL); + } + + table.LastUpdate = DateTime.Now; + helper.SaveData(config, wholepath); + } + } + #endregion + + #region Get a copy of the table / snapshot + if (table.Status == "Enabled") + { + // Get Initial Snapshot + string extractQuery = dbC.FullDataExtractQuery; + extractQuery = UpdateQuery(dbC, table, tableName, extractQuery); + + Logging.Log(String.Format("Generating Snapshot: {0} ", tableName)); + + string locforTable = string.Format("{0}\\{1}.schema\\{2}\\", LocalLocationforTables, table.SchemaName, table.TableName); + + helper.DeleteFolders(locforTable); + + helper.CreateFolders(locforTable); + helper.CreateJSONMetadata(locforTable, table.KeyColumn); + string newfilename = helper.GetFileVersionName(locforTable); + string parquetFilePath = Path.Combine(locforTable, $"{newfilename}.parquet"); + + + string justTablepath = string.Format("/{0}.schema/{1}/{2}.parquet", table.SchemaName, table.TableName, newfilename); + string justMetadatapath = string.Format("/{0}.schema/{1}/_metadata.json", table.SchemaName, table.TableName, newfilename); + string removePath = string.Format("{0}.schema/{1}", table.SchemaName, table.TableName, newfilename); + + Upload.RemoveChangesToOnelake(config, removePath); + + SQLServer.ExecuteRSWritePQ(connectionString, extractQuery, parquetFilePath); + + Upload.CopyChangesToOnelake(config, String.Format("{0}{1}", locforTable, "_metadata.json"), justMetadatapath); + Upload.CopyChangesToOnelake(config, parquetFilePath, justTablepath); + + dbC.Highwatermark = SQLServer.ExecuteScalar(connectionString, dbC.HighwatermarkSQL); + table.DeltaVersion = newfilename; + table.Status = "Running"; + table.LastUpdate = DateTime.Now; + helper.SaveData(config, wholepath); + } + #endregion + Thread.Sleep(1000); + + } + } + } + + #endregion + } + } + + Console.ReadLine(); + + + } + + private static string UpdateQuery(DatabaseConfig dbC, TableConfig table, string tableName, string extractQuery) + { + extractQuery = extractQuery.Replace("{KeyColumn}", table.KeyColumn); + extractQuery = extractQuery.Replace("{OtherColumns}", table.OtherColumns); + extractQuery = extractQuery.Replace("{AdditionalColumns}", table.AdditionalColumns); + extractQuery = extractQuery.Replace("{t}", tableName); + extractQuery = extractQuery.Replace("{t1}", table.TableName); + extractQuery = extractQuery.Replace("{h}", dbC.Highwatermark); + extractQuery = extractQuery.Replace("{SoftDelete}", table.SoftDelete); + return extractQuery; + } + + private static void OnCreated(object sender, FileSystemEventArgs e) + { + Logging.Log($"Change found {e.FullPath}"); + string fileExtension = Path.GetExtension(e.FullPath).ToLower(); + + switch(fileExtension) + { + case ".xlsx": + Excel.ImportFile(e.FullPath, config); + break; + case ".csv": + CSV.ExtractCSV(e.FullPath, config); + break; + case ".accdb": + Access.ExtractAccess(e.FullPath, config); + break; + } + + } + + +} diff --git a/open-mirroring/GenericMirroring/README.md b/open-mirroring/GenericMirroring/README.md new file mode 100644 index 0000000..1136eeb --- /dev/null +++ b/open-mirroring/GenericMirroring/README.md @@ -0,0 +1,50 @@ +# Generic Mirroring + +The aim of this POC (Proof of Concept) code, was to show how easy it is to setup Open Mirroring and Mirroring some data. +It is completely driven by the config file, you will need to configure the config file so it connect to the correct sources and Mirrored databases. + +This is an 'uber' project, including all the sources in one project. + +## Sources +This project is a combination of multiple Mirroring sources. It can mirror: +1. SQL Server 2008-2022 (using Change Tracking) +1. Excel +1. CSV +1. Access +1. TODO: Sharepoint +1. TODO: Dedicated SQL Pool + +## Many to Many Mirroring +The most recent changes allow for what I am calling ""many to many"" Mirroring. +It allows for **1 or many sources** to be Mirrored/replicated to **1 or many Mirrored databases**. + + +1. Many SQL Servers all Mirroring to one Mirrored database. +If you have a multi-tenanted architecture and I want to consolidate all the reporting in one centralised hub. + + +1. One master source being mirrored to many difference Mirrored databases. +If you have a centralise data hub and need to push out changes to many downstream systems. + +## Mirrored database as a source for CDC (SQL Server only) +The change tracking information is being collected and sent to the Mirrored database, allowing the Mirrored database to be used as a source for OLTP/raw data. + +There is also a 'soft' delete option (for SQL Server) - so deletes are only marked as deleted in the Mirrored database - this allows the delete to be propigated to down stream systems. +If + + +## Process +This solution will: +1. Enable Change Tracking on the source database (if needed) and on the table (if needed). It should not interfer, if you have CDC/change tracking already enabled. +1. Create a snapshot or initial extract from SQL Server/source +1. Upload the extracted data (in parquet file) to the Mirroring Landing zone. +1. On a custom schedule per table, extract any changes on the table and upload them to the landing zone. + +# Instructions +1. Compile the solution using VSCode or Visual Studio 2022 - I used Community Edition. +1. Edit the config file, to include the SQL Server, tables you want to Mirror and the SPN Application ID, SPN Secret and SPN Tenant. (All the details are in the [youtube video](https://youtu.be/Gg3YlGyy5P8), there is a [seperate youtube video](https://youtu.be/85xWqWHfWbU)for creating an SPN) +1. [Create the Mirrored database](https://youtu.be/tiHHw2Hj848) , Copy the Landing Zone to the config file. +1. Run the program. + + + diff --git a/open-mirroring/GenericMirroring/SimpleCache.cs b/open-mirroring/GenericMirroring/SimpleCache.cs new file mode 100644 index 0000000..f0dda87 --- /dev/null +++ b/open-mirroring/GenericMirroring/SimpleCache.cs @@ -0,0 +1,68 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace GenericMirroring +{ + public class SimpleCache + { + private readonly Dictionary _cache = new Dictionary(); + private readonly TimeSpan _defaultTTL; + + public SimpleCache(TimeSpan defaultTTL) + { + _defaultTTL = defaultTTL; + } + + public void Add(TKey key, TValue value, TimeSpan? ttl = null) + { + var expiration = DateTime.UtcNow.Add(ttl ?? _defaultTTL); + _cache[key] = new CacheItem(value, expiration); + } + + public bool TryGetValue(TKey key, out TValue value) + { + if (_cache.TryGetValue(key, out CacheItem cacheItem)) + { + if (DateTime.UtcNow <= cacheItem.Expiration) + { + value = cacheItem.Value; + return true; + } + else + { + // Expired item, remove it from the cache + _cache.Remove(key); + } + } + + value = default; + return false; + } + + public void Remove(TKey key) + { + _cache.Remove(key); + } + + public void Clear() + { + _cache.Clear(); + } + + private class CacheItem + { + public TValue Value { get; } + public DateTime Expiration { get; } + + public CacheItem(TValue value, DateTime expiration) + { + Value = value; + Expiration = expiration; + } + } + } +} + diff --git a/open-mirroring/GenericMirroring/Upload.cs b/open-mirroring/GenericMirroring/Upload.cs new file mode 100644 index 0000000..374f0af --- /dev/null +++ b/open-mirroring/GenericMirroring/Upload.cs @@ -0,0 +1,104 @@ +using GenericMirroring; +using Microsoft.Identity.Client; +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace SQLMirroring +{ + public static class Upload + { + public static void CopyChangesToOnelake(Root config, string FiletoCopy, string justFile) + { + foreach (UploadDetails upd in config.uploadDetails) + { + // this is a cheat, I could not get the rest api to work, copying files to onelake, so I fell back to using AZCOPY + string randomfilename = Path.GetRandomFileName(); + //var ps1File = @"C:\temp\di\OpenMirroring\ExcelDemo\./copy_files_tmp.ps1"; + var ps1File = $"{upd.PathtoAZCopy}\\{randomfilename}.ps1"; + + StringBuilder psScript = new StringBuilder(); + psScript.AppendLine($"$env:AZCOPY_AUTO_LOGIN_TYPE = \"SPN\";\r\n"); + psScript.AppendLine($"$env:AZCOPY_SPA_APPLICATION_ID = \"{upd.SPN_Application_ID}\";\r\n"); + psScript.AppendLine($"$env:AZCOPY_SPA_CLIENT_SECRET = \"{upd.SPN_Secret}\";\r\n"); + psScript.AppendLine($"$env:AZCOPY_TENANT_ID = \"{upd.SPN_Tenant}\";\r\n"); + psScript.AppendLine($"$env:AZCOPY_PATH = \"{upd.PathtoAZCopy}\""); + psScript.AppendLine($"{upd.PathtoAZCopy}azcopy.exe copy \"{FiletoCopy}\" \"{upd.LandingZone.Replace(".dfs.", ".blob.")}/{justFile}\" --overwrite=true --from-to=LocalBlob --blob-type Detect --follow-symlinks --check-length=true --put-md5 --follow-symlinks --disable-auto-decoding=false --recursive --trusted-microsoft-suffixes=onelake.blob.fabric.microsoft.com --log-level=INFO;\r\n"); + + File.WriteAllText(ps1File, psScript.ToString()); + Thread.Sleep(1000); + var startInfo = new ProcessStartInfo() + { + FileName = "powershell.exe", + Arguments = $"-NoProfile -ExecutionPolicy ByPass -File \"{ps1File}\"", + UseShellExecute = false + }; + + Thread.Sleep(1000); + Process.Start(startInfo); + Thread.Sleep(1000); + File.Delete(ps1File); + + + } + } + public static void RemoveChangesToOnelake(Root config, string justFile) + { + + // this is a cheat, I could not get the rest api to work, copying files to onelake, so I fell back to using AZCOPY + foreach (UploadDetails upd in config.uploadDetails) + { + try + { + //var ps1File = @"C:\temp\di\OpenMirroring\ExcelDemo\./copy_files_tmp.ps1"; + string randomfilename = Path.GetRandomFileName(); + //var ps1File = @"C:\temp\di\OpenMirroring\ExcelDemo\./copy_files_tmp.ps1"; + var ps1File = $"{upd.PathtoAZCopy}\\{randomfilename}.ps1"; + + StringBuilder psScript = new StringBuilder(); + psScript.AppendLine($"$env:AZCOPY_AUTO_LOGIN_TYPE = \"SPN\";\r\n"); + psScript.AppendLine($"$env:AZCOPY_SPA_APPLICATION_ID = \"{upd.SPN_Application_ID}\";\r\n"); + psScript.AppendLine($"$env:AZCOPY_SPA_CLIENT_SECRET = \"{upd.SPN_Secret}\";\r\n"); + psScript.AppendLine($"$env:AZCOPY_TENANT_ID = \"{upd.SPN_Tenant}\";\r\n"); + psScript.AppendLine($"$env:AZCOPY_PATH = \"{upd.PathtoAZCopy}\""); + psScript.AppendLine($"{upd.PathtoAZCopy}azcopy.exe remove \"{upd.LandingZone.Replace(".dfs.", ".dfs.")}/{justFile}\" --from-to=BlobFSTrash --recursive --trusted-microsoft-suffixes=onelake.dfs.fabric.microsoft.com --log-level=INFO;\r\n"); + + File.WriteAllText(ps1File, psScript.ToString()); + + Thread.Sleep(1500); + + var startInfo = new ProcessStartInfo() + { + FileName = "powershell.exe", + Arguments = $"-NoProfile -ExecutionPolicy ByPass -File \"{ps1File}\"", + UseShellExecute = false + }; + + Thread.Sleep(1500); + Process.Start(startInfo); + Thread.Sleep(1500); + + try + { + Thread.Sleep(500); + File.Delete(ps1File); + } + catch (Exception ex) + { + // Dont worry about this erroring, it means the folder doesnt exist + Logging.Log("Error in RemoveChangesToOnelake Removing file:{0}", ex.Message); + } + } + catch (Exception ex) + { + // Dont worry about this erroring, it means the folder doesnt exist + Logging.Log("Error in RemoveChangesToOnelake:{0}", ex.Message); + } + } + } + + } +} diff --git a/open-mirroring/GenericMirroring/azcopy.exe b/open-mirroring/GenericMirroring/azcopy.exe new file mode 100644 index 0000000..56e8fb5 Binary files /dev/null and b/open-mirroring/GenericMirroring/azcopy.exe differ diff --git a/open-mirroring/GenericMirroring/helper.cs b/open-mirroring/GenericMirroring/helper.cs new file mode 100644 index 0000000..6d89b3a --- /dev/null +++ b/open-mirroring/GenericMirroring/helper.cs @@ -0,0 +1,211 @@ +using GenericMirroring; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Text.Json; +using System.Threading.Tasks; +using System.Security.Cryptography; +using System.Data; + +namespace SQLMirroring +{ + static public class helper { + static public Root ReadConfig(string filePath) + { + try + { + string json = File.ReadAllText(filePath); + return JsonSerializer.Deserialize(json); + } + catch (Exception ex) + { + Logging.Log($"Error reading configuration file: {ex.Message}"); + return null; + } + } + + static public void SaveData(T data, string filePath) + { + lock(data) + { + try + { + string json = JsonSerializer.Serialize(data, new JsonSerializerOptions { WriteIndented = true }); + File.WriteAllText(filePath, json); + //Logging.Log("Data saved successfully."); + } + catch (Exception ex) + { + Logging.Log($"Error saving data: {ex.Message}"); + } + } + } + + static public T LoadData(string filePath) + { + try + { + if (File.Exists(filePath)) + { + string json = File.ReadAllText(filePath); + return JsonSerializer.Deserialize(json); + } + Logging.Log("File not found, returning default data."); + return default; + } + catch (Exception ex) + { + Logging.Log($"Error loading data: {ex.Message}"); + return default; + } + } + + + static public string UpdateString(string orginalString, string d) + { + return orginalString.Replace("{d}", d); + } + + static public void DeleteFolders(string path) + { + if (Directory.Exists(path)) + { + Directory.Delete(path, true); + // Log($"The folder '{path}' has been deleted."); + } + } + + static public void CreateFolders(string path) + { + if (!Directory.Exists(path)) + { + Directory.CreateDirectory(path); + //Log($"The folder '{path}' has been created."); + } + } + + static public void CreateJSONMetadata(string outputpath, string keycolumn) + { + string filePath = $"{outputpath}\\_metadata.json"; + + if (!File.Exists(filePath)) + { + + // Content to write + StringBuilder sb = new StringBuilder(); + sb.Append("{ \"keyColumns\" : [ \""); + sb.Append(keycolumn); + sb.Append("\" ] }"); + + File.WriteAllText(filePath, sb.ToString()); + + //Log($"Config File '{filePath}' has been rewritten."); + } + } + + static public string GetFileVersionName(string folder) + { + double versionnumber = 0; + // Check if the directory exists + if (Directory.Exists(folder)) + { + // Get all files in the directory + string[] files = Directory.GetFiles(folder); + + foreach (var file in files) + { + //Log(file); + double v = 0; + string fileNameWithoutExtension = Path.GetFileNameWithoutExtension(file); + + if (double.TryParse(fileNameWithoutExtension, out v)) + { + if (v > versionnumber) versionnumber = v; + } + } + } + else + { + Logging.Log("Directory does not exist."); + } + + versionnumber++; + + return versionnumber.ToString("00000000000000000000"); + } + + public static string ComputeHash(string input) + { + // Convert the input string to a byte array + byte[] inputBytes = System.Text.Encoding.UTF8.GetBytes(input); + + // Create an instance of SHA256 + using (SHA256 sha256 = SHA256.Create()) + { + // Compute the hash + byte[] hashBytes = sha256.ComputeHash(inputBytes); + + // Convert the hash to a hexadecimal string + StringBuilder sb = new StringBuilder(); + foreach (byte b in hashBytes) + { + sb.Append(b.ToString("x2")); + } + return sb.ToString(); + } + } + + static public Boolean DoDatatablesMatch(DataTable table1, DataTable table2) + { + // TODO : put some code in to work out the rows that have change. + + Boolean rt = false; + // Compare schemas (columns) + if (table1.Columns.Count != table2.Columns.Count) + { + //Logging.Log("Schemas do not match: Different column counts."); + return rt; + } + + for (int i = 0; i < table1.Columns.Count; i++) + { + if (table1.Columns[i].ColumnName != table2.Columns[i].ColumnName || + table1.Columns[i].DataType != table2.Columns[i].DataType) + { + // Logging.Log($"Schemas do not match: Column {i + 1} differs."); + return rt; + } + } + //Logging.Log("Schemas match."); + + // Compare row counts + if (table1.Rows.Count != table2.Rows.Count) + { + //Logging.Log("Row counts do not match."); + return rt; + } + + //Logging.Log("Comparing data..."); + + // Compare rows + for (int i = 0; i < table1.Rows.Count; i++) + { + for (int j = 0; j < table1.Columns.Count; j++) + { + if (!Equals(table1.Rows[i][j], table2.Rows[i][j])) + { + Logging.Log($"Difference found at Row {i + 1}, Column {table1.Columns[j].ColumnName}: " + + $"Table1='{table1.Rows[i][j]}', Table2='{table2.Rows[i][j]}'"); + return rt; + } + } + } + + //Logging.Log("Comparison complete."); + return true; + + } + + } +} diff --git a/open-mirroring/GenericMirroring/mirrorconfig.json b/open-mirroring/GenericMirroring/mirrorconfig.json new file mode 100644 index 0000000..0c174fd --- /dev/null +++ b/open-mirroring/GenericMirroring/mirrorconfig.json @@ -0,0 +1,90 @@ +{ + "SQLChangeTrackingConfig": [ + { + "ConnectionString": "Server=ServerA;Database={d};User Id=login;Password=your_password;TrustServerCertificate=True;", + "Type": "your_table_name", + "ChangeTrackingEnabled": "Enabled", + "syncVersion": 1, + "Tables": [ + { + "TableName": "Persons3", + "SchemaName": "dbo", + "LastUpdate": "2025-01-28T22:36:38.4969622+00:00", + "SecondsBetweenChecks": 1, + "KeyColumn": "PersonID", + "OtherColumns": "LastName,FirstName,Address,City ", + "AdditionalColumns": " getdate() _lastupdated_ ", + "DeltaVersion": "00000000000000000001", + "SoftDelete": "1" + } + ], + "LocalLocationforTables": "C:\\repos\\SQLMirroring\\temp", + "DatabaseName": "demo_changetracking", + "ChangeTrackingSQL": "ALTER DATABASE {d} SET CHANGE_TRACKING = ON (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON);", + "ChangeTrackingTable": "ALTER TABLE {d} ENABLE CHANGE_TRACKING WITH (TRACK_COLUMNS_UPDATED = ON) ", + "Highwatermark": "84", + "HighwatermarkSQL": "select CHANGE_TRACKING_CURRENT_VERSION ( ) ", + "ChangeIncrementalSQL": "select case changetb.SYS_CHANGE_OPERATION WHEN \u0027D\u0027 THEN {SoftDelete} ELSE 1 END __rowMarker__ , changetb.{KeyColumn} , {OtherColumns} ,{AdditionalColumns} , convert(varchar(10),changetb.SYS_CHANGE_VERSION) SYS_CHANGE_VERSION , convert(varchar(10),changetb.SYS_CHANGE_CREATION_VERSION) SYS_CHANGE_CREATION_VERSION, changetb.SYS_CHANGE_OPERATION FROM CHANGETABLE (CHANGES {t}, {h}) AS changetb left outer join {t} rowdata on changetb.{KeyColumn} = rowdata.{KeyColumn}\t ", + "FullDataExtractQuery": "select 0 __rowMarker__ , rowdata.{KeyColumn} , {OtherColumns} ,{AdditionalColumns} , convert(varchar(10),changetb.SYS_CHANGE_VERSION) SYS_CHANGE_VERSION , convert(varchar(10),changetb.SYS_CHANGE_CREATION_VERSION) SYS_CHANGE_CREATION_VERSION, changetb.SYS_CHANGE_OPERATION FROM CHANGETABLE (CHANGES {t}, 0) AS changetb right outer join {t} rowdata on changetb.{KeyColumn} = rowdata.{KeyColumn} " + }, + { + "ConnectionString": "Server=ServerB;Database={d};User Id=database login;Password=Your PAssword;TrustServerCertificate=True;", + "Type": "your_table_name", + "ChangeTrackingEnabled": "Enabled", + "syncVersion": 1, + "Tables": [ + { + "TableName": "AlbumSales", + "SchemaName": "dbo", + "LastUpdate": "2025-01-28T22:36:29.3841447+00:00", + "SecondsBetweenChecks": 15, + "KeyColumn": "AlbumID ", + "OtherColumns": "AlbumName , ArtistName , ReleaseDate , Sales ", + "AdditionalColumns": " getdate() _lastupdated_ ", + "DeltaVersion": "00000000000000000001", + "SoftDelete": "1" + } + ], + "LocalLocationforTables": "C:\\repos\\SQLMirroring\\temp", + "DatabaseName": "demo_db2", + "ChangeTrackingSQL": "ALTER DATABASE {d} SET CHANGE_TRACKING = ON (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON);", + "ChangeTrackingTable": "ALTER TABLE {d} ENABLE CHANGE_TRACKING WITH (TRACK_COLUMNS_UPDATED = ON) ", + "Highwatermark": "0", + "HighwatermarkSQL": "select CHANGE_TRACKING_CURRENT_VERSION ( ) ", + "ChangeIncrementalSQL": "select case changetb.SYS_CHANGE_OPERATION WHEN \u0027D\u0027 THEN {SoftDelete} ELSE 1 END __rowMarker__ , changetb.{KeyColumn} , {OtherColumns} ,{AdditionalColumns} , convert(varchar(10),changetb.SYS_CHANGE_VERSION) SYS_CHANGE_VERSION , convert(varchar(10),changetb.SYS_CHANGE_CREATION_VERSION) SYS_CHANGE_CREATION_VERSION, changetb.SYS_CHANGE_OPERATION FROM CHANGETABLE (CHANGES {t}, {h}) AS changetb left outer join {t} rowdata on changetb.{KeyColumn} = rowdata.{KeyColumn}\t ", + "FullDataExtractQuery": "select 0 __rowMarker__ , rowdata.{KeyColumn} , {OtherColumns} ,{AdditionalColumns} , convert(varchar(10),changetb.SYS_CHANGE_VERSION) SYS_CHANGE_VERSION , convert(varchar(10),changetb.SYS_CHANGE_CREATION_VERSION) SYS_CHANGE_CREATION_VERSION, changetb.SYS_CHANGE_OPERATION FROM CHANGETABLE (CHANGES {t}, 0) AS changetb right outer join {t} rowdata on changetb.{KeyColumn} = rowdata.{KeyColumn} " + } + ], + "ExcelMirroringConfig": { + "folderToWatch": "C:\\temp\\watch", + "outputFolder": "C:\\repos\\SQLMirroring\\temp" + }, + "uploadDetails": [ + { + "SPN_Application_ID": "SPN_APP_ID", + "SPN_Secret": "SECRET", + "SPN_Tenant": "TENANT", + "LandingZone": "https://onelake.dfs.fabric.microsoft.com//workspace_ID/Mirrored_db_1/Files/LandingZone", + "PathtoAZCopy": "C:\\repos\\SQLMirroring\\" + }, + { + "SPN_Application_ID": "SPN_APP_ID", + "SPN_Secret": "SECRET", + "SPN_Tenant": "TENANT", + "LandingZone": "https://onelake.dfs.fabric.microsoft.com/workspace_ID/Mirrored_db_2/Files/LandingZone", + "PathtoAZCopy": "C:\\repos\\SQLMirroring\\" + } + ], + "LocationLogging": "C:\\Users\\maprycem\\source\\repos\\SQLMirroring\\logging", + "AccessMirroringConfig": { + "folderToWatch": "C:\\temp\\watch", + "outputFolder": "C:\\Users\\maprycem\\source\\repos\\SQLMirroring\\temp", + "IncludeFolders": false + }, + "CSVMirroringConfig": { + "folderToWatch": "C:\\temp\\watch", + "outputFolder": "C:\\Users\\maprycem\\source\\repos\\SQLMirroring\\temp" + }, + "Gen2MirroringConfig": null, + "SharepointMirroringConfig": null +} \ No newline at end of file diff --git a/open-mirroring/GenericMirroring/sources/Access.cs b/open-mirroring/GenericMirroring/sources/Access.cs new file mode 100644 index 0000000..7785473 --- /dev/null +++ b/open-mirroring/GenericMirroring/sources/Access.cs @@ -0,0 +1,141 @@ +using System; +using System.Collections.Generic; +using System.Data; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using System.Data.OleDb; +using SQLMirroring; + +namespace GenericMirroring.sources +{ + public class Access + { + static public SimpleCache cache = new SimpleCache(TimeSpan.FromHours(10)); + + public static void ExtractAccess(string filePath, Root config) + { + // Path to your Access database + string databasePath = filePath; + string fileName = Path.GetFileName(filePath); + string fileNameWithoutExtension = Path.GetFileNameWithoutExtension(filePath); + + Logging.Log($"Exporting worksheet: {databasePath}"); + + + // Connection string for Access database + string connectionString = $@"Provider=Microsoft.ACE.OLEDB.12.0;Data Source={databasePath};Persist Security Info=False;"; + + try + { + // List of DataTables to hold the data from each table and view + var dataTables = ConvertAllAccessTablesAndViewsToDataTables(connectionString, config); + + // Display data for demonstration + foreach (var table in dataTables) + { + string newOutputdir = $"{config.AccessMirroringConfig.outputFolder}\\{fileNameWithoutExtension}.schema\\{table.TableName}"; + Console.WriteLine($"Object Name: {table.TableName}"); + + bool firstRun = true; + bool changes = true; + if (cache.TryGetValue(helper.ComputeHash(newOutputdir), out DataTable value)) + { + firstRun = false; + if (helper.DoDatatablesMatch(table, value)) + { + // the table is in the cache and has not changed + changes = false; + } + } + + if (changes) + { + cache.Add(helper.ComputeHash(newOutputdir), table); + + string LocalLocationforTables = config.AccessMirroringConfig.outputFolder; + + string locforTable = string.Format("{0}\\{1}.schema\\{2}\\", LocalLocationforTables, fileNameWithoutExtension, table.TableName); + string newfilename = helper.GetFileVersionName(locforTable); + string parquetFilePath = Path.Combine(locforTable, $"{newfilename}.parquet"); + + string justTablepath = string.Format("/{0}.schema/{1}/{2}.parquet", fileNameWithoutExtension, table.TableName, newfilename); + string justMetadatapath = string.Format("/{0}.schema/{1}/_metadata.json", fileNameWithoutExtension, table.TableName, newfilename); + //helper.DeleteFolders(locforTable); + + if (firstRun == true) + { + helper.CreateFolders(locforTable); + helper.CreateJSONMetadata(locforTable, "_id_"); + Upload.CopyChangesToOnelake(config, string.Format("{0}{1}", locforTable, "_metadata.json"), justMetadatapath); + } + + ParquetDump.WriteDataTableToParquet(table, parquetFilePath); + Upload.CopyChangesToOnelake(config, parquetFilePath, justTablepath); + } + } + } + catch (Exception ex) + { + Logging.Log($"Error: {ex.Message}"); + } + + } + + static List ConvertAllAccessTablesAndViewsToDataTables(string connectionString, Root config) + { + var dataTables = new List(); + + using (var connection = new OleDbConnection(connectionString)) + { + connection.Open(); + + // Get schema information for tables and views + var schemaTable = connection.GetSchema("Tables"); + foreach (DataRow row in schemaTable.Rows) + { + string objectName = row["TABLE_NAME"].ToString(); + string objectType = row["TABLE_TYPE"].ToString(); + + // Include both tables and views + if (objectType == "TABLE" || objectType == "VIEW") + { + // Load object data into a DataTable + var dataTable = new DataTable(objectName); + using (var command = new OleDbCommand($"SELECT * FROM [{objectName}]", connection)) + using (var adapter = new OleDbDataAdapter(command)) + { + try + { + adapter.Fill(dataTable); + + dataTable.Columns.Add($"__rowMarker__", typeof(int)); + dataTable.Columns.Add($"_id_", typeof(int)); + + int rowCount = dataTable.Rows.Count; + for (int i = 0; i < rowCount; i++) + { + + dataTable.Rows[i]["__rowMarker__"] = 1; + dataTable.Rows[i]["_id_"] = i; + + } + dataTables.Add(dataTable); + //dataTables.Add(dataTable); + } + catch (Exception ex) + { + Logging.Log($"Error reading {objectType} '{objectName}': {ex.Message}"); + } + } + } + } + } + + return dataTables; + } + + + + } +} diff --git a/open-mirroring/GenericMirroring/sources/CSV.cs b/open-mirroring/GenericMirroring/sources/CSV.cs new file mode 100644 index 0000000..7f52649 --- /dev/null +++ b/open-mirroring/GenericMirroring/sources/CSV.cs @@ -0,0 +1,114 @@ +using SQLMirroring; +using System; +using System.Collections.Generic; +using System.Data; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace GenericMirroring.sources +{ + public static class CSV + { + static public SimpleCache cache = new SimpleCache(TimeSpan.FromHours(10)); + + static public void ExtractCSV(string filePath, Root config) + { + Logging.Log($"ExtractCSV {filePath}"); + Thread.Sleep(1000); + DataTable table = ConvertCsvToDataTable(filePath); + string tablename = Path.GetFileNameWithoutExtension(filePath); + string fileNameWithoutExtension = "dbo";// Path.GetFileNameWithoutExtension(filePath); + string newOutputdir = $"{config.CSVMirroringConfig.outputFolder}\\{fileNameWithoutExtension}"; + + bool firstRun = true; + if (cache.TryGetValue(helper.ComputeHash(newOutputdir), out DataTable value)) + { + firstRun = false; + if (helper.DoDatatablesMatch(table, value)) + { + // the table is in the cache and has not changed + return; + } + } + cache.Add(helper.ComputeHash(newOutputdir), table); + + string LocalLocationforTables = config.CSVMirroringConfig.outputFolder; + + string locforTable = string.Format("{0}\\{1}.schema\\{2}\\", LocalLocationforTables, fileNameWithoutExtension, tablename); + string newfilename = helper.GetFileVersionName(locforTable); + string parquetFilePath = Path.Combine(locforTable, $"{newfilename}.parquet"); + + string justTablepath = string.Format("/{0}.schema/{1}/{2}.parquet", fileNameWithoutExtension, tablename, newfilename); + string justMetadatapath = string.Format("/{0}.schema/{1}/_metadata.json", fileNameWithoutExtension, tablename, newfilename); + //helper.DeleteFolders(locforTable); + + if (firstRun == true) + { + helper.CreateFolders(locforTable); + helper.CreateJSONMetadata(locforTable, "_id_"); + Upload.CopyChangesToOnelake(config, string.Format("{0}{1}", locforTable, "_metadata.json"), justMetadatapath); + } + + ParquetDump.WriteDataTableToParquet(table, parquetFilePath); + Upload.CopyChangesToOnelake(config, parquetFilePath, justTablepath); + + } + + public static DataTable ConvertCsvToDataTable(string filePath) + { + Logging.Log($"ConvertCsvToDataTable {filePath}"); + + DataTable dataTable = new DataTable(); + + try + { + // Read all lines from the CSV file + string[] csvLines = File.ReadAllLines(filePath); + + if (csvLines.Length == 0) + { + Logging.Log("The CSV file is empty."); + return dataTable; + } + + // Add columns to the DataTable using the first row + string[] headers = csvLines[0].Split(','); + + dataTable.Columns.Add($"__rowMarker__", typeof(int)); + dataTable.Columns.Add($"_id_", typeof(int)); + + foreach (string header in headers) + { + // Console.WriteLine($" header-{header.Trim()}"); + + dataTable.Columns.Add(header.Trim()); + } + + // Add rows to the DataTable for each subsequent line + for (int i = 1; i < csvLines.Length; i++) + { + string[] rowValues = csvLines[i].Split(','); + DataRow dataRow = dataTable.NewRow(); + + dataRow[0] = 1; + dataRow[1] = i; + + for (int j = 0; j < headers.Length; j++) + { + dataRow[j + 2] = rowValues[j].Trim(); + // Console.WriteLine($" data-{rowValues[j].Trim()}"); + } + + dataTable.Rows.Add(dataRow); + } + } + catch (Exception ex) + { + Logging.Log($"Error processing CSV file: {ex.Message}"); + } + + return dataTable; + } + } +} diff --git a/open-mirroring/GenericMirroring/sources/Excel.cs b/open-mirroring/GenericMirroring/sources/Excel.cs new file mode 100644 index 0000000..58b8064 --- /dev/null +++ b/open-mirroring/GenericMirroring/sources/Excel.cs @@ -0,0 +1,203 @@ +using Microsoft.Identity.Client; +using SQLMirroring; +using System; +using System.Collections.Generic; +using System.ComponentModel; +using System.Data; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using LicenseContext = OfficeOpenXml.LicenseContext; +using OfficeOpenXml; +using Parquet.Schema; +using System.Reflection.PortableExecutable; + +namespace GenericMirroring.sources +{ + public static class Excel + { + public static void ImportFile(string FullPathtoExcelDocument, Root config) + { + Thread.Sleep(500); + + string fileExtension = Path.GetExtension(FullPathtoExcelDocument).ToLower(); + + Logging.Log($"Importing: {FullPathtoExcelDocument}"); + Logging.Log($"fileExtension: {fileExtension}"); + + if (FullPathtoExcelDocument.Contains("~")) // ignore temp files + { + return; + } + + if (fileExtension == ".xlsx") + ConvertExcelToDatatable(FullPathtoExcelDocument, config); + + } + + static public SimpleCache cache = new SimpleCache(TimeSpan.FromHours(10)); + static public void ConvertExcelToDatatable(string filePathtoExcel, Root config) + { + try + { + Encoding.RegisterProvider(CodePagesEncodingProvider.Instance); + + string fileName = Path.GetFileName(filePathtoExcel); + string fileNameWithoutExtension = Path.GetFileNameWithoutExtension(fileName); + + ExcelPackage.LicenseContext = LicenseContext.NonCommercial; + + using (var package = new ExcelPackage(new FileInfo(filePathtoExcel))) + { + foreach (var worksheet in package.Workbook.Worksheets) + { + Logging.Log($"Exporting worksheet: {worksheet.Name}"); + + var table = new DataTable(); + + table.Columns.Add($"__rowMarker__", typeof(int)); + table.Columns.Add($"_id_", typeof(int)); + + if (worksheet.Dimension == null) + return; + + int rowCount = worksheet.Dimension.Rows; + int colCount = worksheet.Dimension.Columns; + + + string v = ""; + for (int i = 1; i <= colCount; i++) + { + try + { + v = worksheet.Cells[1, i]?.Text ?? ""; + } + catch (Exception ex) + { + Logging.Log(ex.Message); + } + table.Columns.Add($"{v}", typeof(string)); + } + + List foo = new List(); + + object[] rowVals = new object[colCount + 2]; + + for (int row = 2; row <= rowCount; row++) + { + rowVals[0] = 1; + rowVals[1] = row - 1; + + v = ""; + for (int i = 1; i <= colCount; i++) + { + try + { + v = worksheet.Cells[row, i]?.Text ?? ""; + rowVals[i + 1] = v; + foo.Add(v); + } + catch (Exception ex) + { + Logging.Log(ex.Message); + } + } + table.Rows.Add(rowVals); + } + + + string tableName = $"{fileNameWithoutExtension}.schema\\{worksheet.Name}"; + bool firstRun = true; + if (cache.TryGetValue(helper.ComputeHash(tableName), out DataTable value)) + { + firstRun = false; + if (helper.DoDatatablesMatch(table, value)) + { + // the table is in the cache and has not changed + return; + } + } + // TODO: Code here to work out which rows have changed. + + cache.Add(helper.ComputeHash(tableName), table); + + string LocalLocationforTables = config.ExcelMirroringConfig.outputFolder; + + string locforTable = string.Format("{0}\\{1}.schema\\{2}\\", LocalLocationforTables, fileNameWithoutExtension, worksheet.Name); + string newfilename = helper.GetFileVersionName(locforTable); + string parquetFilePath = Path.Combine(locforTable, $"{newfilename}.parquet"); + + string justTablepath = string.Format("/{0}.schema/{1}/{2}.parquet", fileNameWithoutExtension, worksheet.Name, newfilename); + string justMetadatapath = string.Format("/{0}.schema/{1}/_metadata.json", fileNameWithoutExtension, worksheet.Name, newfilename); + //helper.DeleteFolders(locforTable); + + if (firstRun == true) + { + helper.CreateFolders(locforTable); + helper.CreateJSONMetadata(locforTable, "_id_"); + Upload.CopyChangesToOnelake(config, string.Format("{0}{1}", locforTable, "_metadata.json"), justMetadatapath); + } + + ParquetDump.WriteDataTableToParquet(table, parquetFilePath); + Upload.CopyChangesToOnelake(config, parquetFilePath, justTablepath); + } + } + } + catch (Exception ex) + { + Logging.Log($"Excel document could be empty {ex.Message}"); + } + } + + static public bool DoDatatablesMatch(DataTable table1, DataTable table2) + { + + bool rt = false; + // Compare schemas (columns) + if (table1.Columns.Count != table2.Columns.Count) + { + //Logging.Log("Schemas do not match: Different column counts."); + return rt; + } + + for (int i = 0; i < table1.Columns.Count; i++) + { + if (table1.Columns[i].ColumnName != table2.Columns[i].ColumnName || + table1.Columns[i].DataType != table2.Columns[i].DataType) + { + // Logging.Log($"Schemas do not match: Column {i + 1} differs."); + return rt; + } + } + //Logging.Log("Schemas match."); + + // Compare row counts + if (table1.Rows.Count != table2.Rows.Count) + { + //Logging.Log("Row counts do not match."); + return rt; + } + + //Logging.Log("Comparing data..."); + + // Compare rows + for (int i = 0; i < table1.Rows.Count; i++) + { + for (int j = 0; j < table1.Columns.Count; j++) + { + if (!Equals(table1.Rows[i][j], table2.Rows[i][j])) + { + Logging.Log($"Difference found at Row {i + 1}, Column {table1.Columns[j].ColumnName}: " + + $"Table1='{table1.Rows[i][j]}', Table2='{table2.Rows[i][j]}'"); + return rt; + } + } + } + + //Logging.Log("Comparison complete."); + return true; + + } + + } +} diff --git a/open-mirroring/GenericMirroring/sources/SQLServer.cs b/open-mirroring/GenericMirroring/sources/SQLServer.cs new file mode 100644 index 0000000..f4ccf4e --- /dev/null +++ b/open-mirroring/GenericMirroring/sources/SQLServer.cs @@ -0,0 +1,156 @@ +using Microsoft.Data.SqlClient; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using SQLMirroring; +using System; +using Microsoft.Data.SqlClient; +using static System.Net.Mime.MediaTypeNames; +using System.IO; +using System.Runtime.Intrinsics.Arm; +using System.Data; + + +namespace GenericMirroring.sources +{ + public static class SQLServer + { + public static SqlDataReader ExecuteRS(string connectionString, string query) + { + try + { + + SqlDataReader reader; + // Create and open a connection to SQL Server + using (SqlConnection connection = new SqlConnection(connectionString)) + { + connection.Open(); + //Log("Connection to SQL Server successful."); + + // Create a command object + using (SqlCommand command = new SqlCommand(query, connection)) + { + + // Execute the command and process the results + /* using (reader = command.ExecuteReader()) + { + // Loop through the rows + while (reader.Read()) + { + // Example: Access data by column index + Log($"Column1: {reader[0]}, Column2: {reader[1]}"); + } + using var tempFile = new FileStream(parquetFilePath, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.None, 1024 * 256, FileOptions.DeleteOnClose); + + var sqr = ExecuteRSWritePQ(connectionString, extractQuery, tempFile); + + if (sqr != null) { + + ParquetWrite.WriteDatareaderToParquet(sqr, tempFile); + } + + + } */ + reader = command.ExecuteReader(); + } + } + return reader; + } + catch (Exception ex) + { + // Handle any errors that may have occurred + Logging.Log($"An error occurred: {ex.Message}"); + return null; + } + } + public static bool ExecuteRSWritePQ(string connectionString, string query, string filePath) + { + try + { + + SqlDataReader reader; + // Create and open a connection to SQL Server + using (SqlConnection connection = new SqlConnection(connectionString)) + { + connection.Open(); + //Log("Connection to SQL Server successful."); + + // Create a command object + using (SqlCommand command = new SqlCommand(query, connection)) + { + reader = command.ExecuteReader(); + + if (reader.HasRows) + { + ParquetDump.WriteDataTableToParquet(reader, filePath); + return true; + } + else + { + return false; + } + + } + } + } + catch (Exception ex) + { + // Handle any errors that may have occurred + Logging.Log($"An error occurred: {ex.Message}"); + return false; + } + } + public static void ExecuteNonQuery(string connectionString, string query) + { + try + { + // Create and open a connection to SQL Server + using (SqlConnection connection = new SqlConnection(connectionString)) + { + connection.Open(); + //Log("ExecuteNonQuery:Connection to SQL Server successful."); + + // Create a command object + using (SqlCommand command = new SqlCommand(query, connection)) + { + command.ExecuteNonQuery(); + } + } + } + catch (Exception ex) + { + // Handle any errors that may have occurred + Logging.Log($"ExecuteNonQuery:An error occurred: {ex.Message}"); + } + } + public static string ExecuteScalar(string connectionString, string query) + { + try + { + + string reader; + // Create and open a connection to SQL Server + using (SqlConnection connection = new SqlConnection(connectionString)) + { + connection.Open(); + //Log("Connection to SQL Server successful."); + + // Create a command object + using (SqlCommand command = new SqlCommand(query, connection)) + { + reader = command.ExecuteScalar().ToString(); + } + } + return reader; + } + catch (Exception ex) + { + // Handle any errors that may have occurred + Logging.Log($"An error occurred: {ex.Message}"); + return string.Empty; + } + } + } +} diff --git a/open-mirroring/README.md b/open-mirroring/README.md index 5d6813f..c659d89 100644 --- a/open-mirroring/README.md +++ b/open-mirroring/README.md @@ -1,7 +1,5 @@ # Opening Mirroring -This is a collection of Opening Mirroring solutions. - This is a collection of open sources demos/proof of concepts for Mirroring different sources using Open Mirroing. @@ -15,7 +13,6 @@ Part 2 - [Create a Mirrored database](https://youtu.be/tiHHw2Hj848) Part 3 - [Setting Up Permissions for Excel Mirroring in Microsoft Fabric](https://youtu.be/85xWqWHfWbU) - ## SQLServerChangeTracking Is the On-Prem SQL Server Open Mirroring solution for SQL Server 2008 and above (althought I have only tested SQL Server 2017 on Docker.) This solution is designed to show you how you can simply and quickly setup Mirroring from virtually any SQL Server by using change tracking. @@ -24,6 +21,3 @@ Is the On-Prem SQL Server Open Mirroring solution for SQL Server 2008 and above Project for Mirroring Excel documents to Fabric. - - - diff --git a/open-mirroring/SQLServerChangeTracking/README.md b/open-mirroring/SQLServerChangeTracking/README.md index bc76f59..e20dcb2 100644 --- a/open-mirroring/SQLServerChangeTracking/README.md +++ b/open-mirroring/SQLServerChangeTracking/README.md @@ -15,6 +15,7 @@ This solution will: 1. On a custom schedule per table, extract any changes on the table and upload them to the landing zone. # Instructions + 1. Compile the solution using VSCode or Visual Studio 2022 - I used Community Edition. 1. Edit the config file, to include the SQL Server, tables you want to Mirror and the SPN Application ID, SPN Secret and SPN Tenant. (All the details are in the [youtube video](https://youtu.be/Gg3YlGyy5P8), there is a [seperate youtube video](https://youtu.be/85xWqWHfWbU)for creating an SPN) 1. [Create the Mirrored database](https://youtu.be/tiHHw2Hj848) , Copy the Landing Zone to the config file.