HoDA_Radio/app/core/dicom_manager.py
2025-09-26 17:51:43 +02:00

387 lines
15 KiB
Python

from numpy import clip, stack, array
from os import path, listdir
from pydicom import dcmread
from PyQt6.QtGui import QImage, QIcon, QPixmap
from pydicom.multival import MultiValue
from pydicom.fileset import FileSet
from app.core.export_manager import ExportCategory
from PyQt6.QtCore import QObject, pyqtSignal
import json, pandas
class Dicom:
def __init__(self, files=None, name='', path=None):
self.childs = files or [] # Default to empty list
self.name = name
self.path = path
self.mask = None
self.category = ExportCategory.A
self.export = False # Default export status
def set_export(self, export):
"""Set the export status of this dicom"""
if not isinstance(export, bool):
raise ValueError("Export status must be a boolean")
self.export = export
def get_export(self):
"""Get the export status of this dicom"""
return self.export
def get_category(self):
"""Get the category associated with this dicom"""
return self.category
def set_category(self, category):
self.category = category
def get_mask(self):
"""Get the mask associated with this dicom"""
return self.mask
def set_mask(self, mask):
"""Set the mask for this dicom"""
self.mask = mask
def get_path(self):
return self.path
def add_file_or_folder(self, file):
if file not in self.childs:
self.childs.append(file)
def add_files_or_folders(self, files):
self.childs.extend(file for file in files if file not in self.childs)
def get_icon(self):
return self.childs[0].get_icon() if self.childs else None
def get_subfolders(self):
return [child for child in self.childs if isinstance(child, Dicom)]
def get_files(self):
return [child for child in self.childs if isinstance(child, DicomFile)]
def get_children_types(self):
return type(self.childs[0]) if self.childs else None
def get_name(self):
return self.name
def is_empty(self):
return len(self.childs) == 0
def export_metadata_json(self, destPath):
dicom_file = self.get_files()[0] if self.get_files() else None
if dicom_file:
dicom_file.export_metadata_json(destPath)
def export_metadata_xls(self, destPath):
dicom_file = self.get_files()[0] if self.get_files() else None
if dicom_file:
dicom_file.export_metadata_xls(destPath)
def __repr__(self):
return f"Dicom(name={self.name}, path={self.path}, files={len(self.childs)}, mask={self.mask is not None}, export={self.export}, category={self.category})"
class DicomFile:
def __init__(self, file_path):
self.file_path = file_path
self.name = path.basename(file_path)
self.image = None
self.icon = None
self.is_valid = None # Changed to None for lazy validation
self.error_message = None
self.ds = None # Don't load immediately
self._metadata_loaded = False
def _ensure_loaded(self):
"""Lazy loading of DICOM dataset"""
if self.ds is None and self.is_valid is None:
try:
# Use more aggressive deferring
self.ds = dcmread(self.file_path, defer_size="1 KB", stop_before_pixels=True)
self.is_valid = True
except Exception as e:
self.is_valid = False
self.error_message = str(e)
self.ds = None
def _ensure_pixel_data_loaded(self):
"""Load pixel data only when needed"""
if self.ds is not None and not hasattr(self.ds, 'pixel_array'):
try:
# Reload with pixel data
self.ds = dcmread(self.file_path, defer_size="100 MB")
except Exception as e:
pass
def get_image(self):
self._ensure_loaded()
if not self.is_valid:
return None
if self.image is None:
self._ensure_pixel_data_loaded()
self.image = self.get_scaled_image()
return self.image
def get_name(self):
return self.name
def get_icon(self):
self._ensure_loaded()
if not self.is_valid:
return None
if self.icon is None:
try:
image = self.get_image()
if image is not None:
self.icon = QIcon(QPixmap.fromImage(DicomManager().array_to_image(image)))
except Exception:
self.icon = None
return self.icon
def get_scaled_image(self):
self._ensure_loaded()
if not self.is_valid or self.ds is None:
return None
self._ensure_pixel_data_loaded()
try:
if not hasattr(self.ds, 'pixel_array'):
return None
float_array = self.ds.pixel_array.astype('float32')
slope = getattr(self.ds, 'RescaleSlope', 1.0)
intercept = getattr(self.ds, 'RescaleIntercept', 0.0)
float_array = float_array * slope + intercept
if self.ds.get('PhotometricInterpretation', '').upper() == 'MONOCHROME1':
float_array = float_array.max() - float_array
window_center = self.ds.get('WindowCenter', None)
window_width = self.ds.get('WindowWidth', None)
if isinstance(window_center, MultiValue): window_center = window_center[0]
if isinstance(window_width, MultiValue): window_width = window_width[0]
if window_center is not None and window_width is not None and float(window_width) > 0:
lo = float(window_center) - float(window_width) / 2
hi = float(window_center) + float(window_width) / 2
float_array = clip(float_array, lo, hi)
else:
float_array = clip(float_array, float_array.min(), float_array.max())
float_array = ((float_array - float_array.min()) / (float_array.max() - float_array.min()) * 255.0).astype('uint8')
if float_array.ndim == 2: # Grayscale image
float_array = stack([float_array, float_array, float_array], axis=-1)
return float_array
except Exception as e:
return None
def get_metadata(self, metadata : str):
return self.ds.get(metadata, "")
def get_all_metadata(self):
"""Get all metadata with proper type conversion for JSON serialization"""
self._ensure_loaded()
if not self.ds:
return {}
# Load full dataset for metadata export
if not self._metadata_loaded:
try:
self.ds = dcmread(self.file_path, defer_size="100 MB")
self._metadata_loaded = True
except:
return {}
metadata = {}
for tag in self.ds.keys():
try:
element = self.ds[tag]
value = element.value
# Convert various DICOM types to JSON-serializable types
if hasattr(value, '__iter__') and not isinstance(value, (str, bytes)):
# Handle sequences and arrays
try:
value = list(value)
except:
value = str(value)
elif hasattr(value, 'decode') and callable(value.decode):
# Handle bytes
try:
value = value.decode('utf-8', errors='ignore')
except:
value = str(value)
else:
# Convert other types to string if they're not basic types
if not isinstance(value, (str, int, float, bool, type(None))):
value = str(value)
# Use tag name if available, otherwise use hex representation
tag_name = element.name if hasattr(element, 'name') and element.name else str(tag)
metadata[tag_name] = value
except Exception as e:
# If there's any error with a specific tag, skip it
continue
return metadata
def set_metadata(self, metadata, value):
if not isinstance(metadata, str):
raise ValueError("Metadata key must be a string")
if not isinstance(value, (str, int, float)):
raise ValueError("Metadata value must be a string, int, or float")
self.ds[metadata] = value
def export_metadata_json(self, destPath):
"""Export metadata to JSON with proper error handling"""
try:
metadata = self.get_all_metadata()
json_path = path.join(destPath, f"{self.name}_metadata.json")
with open(json_path, 'w', encoding='utf-8') as json_file:
json.dump(metadata, json_file, indent=4, ensure_ascii=False, default=str)
except Exception as e:
# Re-raise with more context
raise Exception(f"Error exporting JSON for {self.name}: {str(e)}")
def export_metadata_xls(self, destPath):
"""Export metadata to XLS with proper error handling"""
try:
metadata = self.get_all_metadata()
# Convert to list of tuples for pandas
metadata_list = [(tag, value) for tag, value in metadata.items()]
df = pandas.DataFrame(metadata_list, columns=['Tag', 'Value'])
xls_path = path.join(destPath, f"{self.name}_metadata.xlsx")
# Use xlsxwriter engine for better compatibility
with pandas.ExcelWriter(xls_path, engine='openpyxl') as writer:
df.to_excel(writer, sheet_name='Metadata', index=False)
except Exception as e:
# Re-raise with more context
raise Exception(f"Error exporting XLS for {self.name}: {str(e)}")
def extract_metadata(self, imagePath, toGet=[]):
if not isinstance(imagePath, str):
raise ValueError("Image path must be a string")
try:
file_set = FileSet(imagePath)
for record in file_set:
metadata = record.get_metadata()
if not metadata:
continue
# Extract only the requested metadata keys
if toGet:
return {key: metadata.get(key, "") for key in toGet}
else:
return metadata
except Exception as e:
return {}
def __repr__(self):
return f"DicomFile(path={self.file_path}, name={self.name})"
class DicomManager(QObject):
progress_changed = pyqtSignal(int)
error_occurred = pyqtSignal(str) # New signal for errors
def __init__(self, dicoms=[]):
super().__init__()
self.dicoms = dicoms
def add_dicom(self, dicom):
if isinstance(dicom, Dicom):
self.dicoms.append(dicom)
elif isinstance(dicom, str):
self.dicoms.append(self.process_dicomdir(dicom))
elif isinstance(dicom, list):
for item in dicom:
self.add_dicom(item)
else:
raise TypeError("Expected Dicom or str, got {}".format(type(dicom)))
def get_dicoms(self):
return self.dicoms
def get_dicom_paths(self):
return [dicom.get_path() for dicom in self.dicoms]
def process_dicomdir(self, dicomdirPath):
try:
if dicomdirPath.endswith('DICOMDIR'):
image_files = []
subDirs = []
currentName = ""
ds = dcmread(dicomdirPath)
if hasattr(ds, 'DirectoryRecordSequence'):
basePath = path.dirname(dicomdirPath)
total_records = len(ds.DirectoryRecordSequence)
for i, record in enumerate(ds.DirectoryRecordSequence):
# Emit progress for DICOM processing
if i % 5 == 0: # More frequent updates
progress = int((i / total_records) * 100)
self.progress_changed.emit(progress)
if record.DirectoryRecordType == "IMAGE" and hasattr(record, 'ReferencedFileID'):
relativePath = path.join(*record.ReferencedFileID)
imagePath = path.join(basePath, relativePath)
name = record.ReferencedFileID[-2]
if name != currentName:
if currentName:
subDirs.append(Dicom(image_files,currentName))
currentName = name
image_files = []
# Create DicomFile without immediate validation
dicom_file = DicomFile(imagePath)
image_files.append(dicom_file)
if image_files: # Don't forget the last group
subDirs.append(Dicom(image_files,currentName))
if len(subDirs) <= 1 and len(image_files) > 0:
return Dicom(image_files, path.basename(path.dirname(dicomdirPath)), dicomdirPath)
return Dicom(subDirs, path.basename(path.dirname(dicomdirPath)), dicomdirPath)
elif dicomdirPath.endswith('.dcm'):
# Process single DCM file with lazy loading
dicom_file = DicomFile(dicomdirPath)
return Dicom([dicom_file], path.basename(dicomdirPath), dicomdirPath)
else:
# Process directory containing DCM files
dcm_files = []
file_list = [f for f in listdir(dicomdirPath) if f.lower().endswith('.dcm')]
total_files = len(file_list)
for i, dcm_file in enumerate(file_list):
# Emit progress for DCM file processing
if i % 10 == 0: # Update every 10 files
progress = int((i / total_files) * 100)
self.progress_changed.emit(progress)
# Create DicomFile without immediate validation
dicom_file = DicomFile(path.join(dicomdirPath, dcm_file))
dcm_files.append(dicom_file)
# Create Dicom object with all files (validation will happen later)
if dcm_files:
return Dicom(dcm_files, path.basename(dicomdirPath), dicomdirPath)
else:
return None
except Exception as e:
self.error_occurred.emit("load_error")
return None
def image_to_array(self,qimg):
ptr = qimg.bits()
ptr.setsize(qimg.width() * qimg.height() * 3)
return array(ptr).reshape(qimg.height(), qimg.width(), 3)
def array_to_image(self, arr):
h, w, ch = arr.shape
return QImage(arr.data, w, h, ch * w, QImage.Format.Format_RGB888).copy()