diff --git a/kolibri/core/discovery/test/test_filesystem_utils.py b/kolibri/core/discovery/test/test_filesystem_utils.py index 0d8187fbc47..11b2fe6137b 100644 --- a/kolibri/core/discovery/test/test_filesystem_utils.py +++ b/kolibri/core/discovery/test/test_filesystem_utils.py @@ -1,7 +1,8 @@ -import ntpath import os import posixpath +import sys +import pytest from django.test import TestCase from mock import patch @@ -116,6 +117,10 @@ def mocked_wmic_output(): return windows_data.wmic_csv +@pytest.mark.skipif( + sys.platform != "win32", + reason="This test is only for Windows platform", +) class WindowsFilesystemTestCase(TestCase): """ Test retrieval and parsing of disk info for Windows, using mocked command output. @@ -123,8 +128,6 @@ class WindowsFilesystemTestCase(TestCase): @patch_os_access(windows_data.os_access_read, windows_data.os_access_write) @patch_os_path_exists_for_kolibri_folder(windows_data.has_kolibri_data_folder) - @patch("sys.platform", "win32") - @patch("os.path", ntpath) @patch( "kolibri.core.discovery.utils.filesystem.windows._wmic_output", mocked_wmic_output, @@ -154,9 +157,24 @@ def test_drive_space(self): self.assertEqual(self.d_drive.totalspace, 58388480) def test_drive_names(self): - self.assertEqual(self.c_drive.name, "Local Fixed Disk") + self.assertEqual(self.c_drive.name, "C: (Local Fixed Disk)") self.assertEqual(self.d_drive.name, "VBOXADDITIONS_4.") + @patch("kolibri.core.discovery.utils.filesystem.windows._wmic_output") + def test_powershell_fallback(self, mock_wmic): + # Force wmic to fail by raising an exception with the specific error message + # that would trigger the PowerShell fallback + mock_wmic.side_effect = Exception( + "Could not run command 'wmic logicaldisk list full /format:csv > ...'" + ) + + # Now call the function - it should use the PowerShell fallback + drives = enumerate_mounted_disk_partitions() + # Check that the result is a dictionary + # On Github Actions, this seems to produce no listed drives + # so the best we can hope for is that the function above doesn't error. + self.assertIsInstance(drives, dict) + class LinuxFilesystemTestCase(TestCase): """ diff --git a/kolibri/core/discovery/utils/filesystem/windows.py b/kolibri/core/discovery/utils/filesystem/windows.py index bffd91ceff4..592d90716c1 100644 --- a/kolibri/core/discovery/utils/filesystem/windows.py +++ b/kolibri/core/discovery/utils/filesystem/windows.py @@ -1,7 +1,9 @@ import codecs import csv +import json import logging import os +import subprocess import tempfile import uuid @@ -20,11 +22,31 @@ ] +def _get_drive_name(drive, path): + # More robustly name drives with multiple fallbacks + if drive.get("VolumeName"): + return drive.get("VolumeName") + caption = drive.get("Caption") + description = drive.get("Description") + if caption and description: + return "{} ({})".format(caption, description) + elif caption: + return caption + elif description: + return description + return path + + def get_drive_list(): drives = [] - drive_list = _parse_wmic_csv_output(_wmic_output()) + try: + drive_list = _parse_wmic_csv_output(_wmic_output()) + except Exception as e: + if "Could not run command" not in str(e): + raise + drive_list = _get_drive_list_powershell() for drive in drive_list: @@ -53,11 +75,12 @@ def get_drive_list(): if not os.access(path, os.R_OK): continue + name = _get_drive_name(drive, path) # combine the metadata, using backup fields for missing pieces, and return drives.append( { "path": path, - "name": drive.get("VolumeName") or drive.get("Description"), + "name": name, "filesystem": drive.get("FileSystem").lower(), "freespace": int(drive.get("FreeSpace") or 0), "totalspace": int(drive.get("Size") or 0), @@ -137,3 +160,56 @@ def _parse_wmic_csv_output(text): # turn each row into a dict, mapping the header text of each column to the row's value for that column return [dict(zip(header, row)) for row in rows] + + +def _get_drive_list_powershell(): + """ + Get the list of drives using PowerShell when wmic is unavailable. + Returns data in a format compatible with the wmic csv output parsing. + """ + + # Create a unique temp file + temp_file_path = os.path.join( + tempfile.gettempdir(), "kolibri_disks_ps-{}.json".format(uuid.uuid4()) + ) + + # PowerShell command with explicit property selection + powershell_cmd = [ + "powershell", + "-NoProfile", + "-ExecutionPolicy", + "Bypass", + "-Command", + "Get-WmiObject -Class Win32_LogicalDisk | " + "Select-Object @{Name='DeviceID';Expression={$_.DeviceID}}, " + "@{Name='DriveType';Expression={$_.DriveType}}, " + "@{Name='Caption';Expression={$_.Caption}}, " + "@{Name='Description';Expression={$_.Description}}, " + "@{Name='VolumeName';Expression={$_.VolumeName}}, " + "@{Name='Size';Expression={$_.Size}}, " + "@{Name='FreeSpace';Expression={$_.FreeSpace}}, " + "@{Name='FileSystem';Expression={$_.FileSystem}}, " + "@{Name='VolumeSerialNumber';Expression={$_.VolumeSerialNumber}} | " + f"ConvertTo-Json | Out-File -FilePath '{temp_file_path}' -Encoding utf8", + ] + + # Run the command + subprocess.run(powershell_cmd, check=True, capture_output=True) + + disks_data = [] + + # Read and parse the JSON file + if os.path.exists(temp_file_path) and os.path.getsize(temp_file_path) > 0: + # Open the file with utf-8-sig encoding to handle BOM + with open(temp_file_path, "r", encoding="utf-8-sig") as f: + content = f.read().strip() + + disks_data = json.loads(content or "[]") + + # Handle case where only one disk is returned (not in a list) + if not isinstance(disks_data, list): + disks_data = [disks_data] + + os.remove(temp_file_path) + + return disks_data