HoDA_Radio/app/core/export_manager.py
2025-09-26 17:51:43 +02:00

568 lines
24 KiB
Python

import os
from reportlab.pdfgen import canvas
from reportlab.lib.pagesizes import A4
import numpy as np
from PIL import Image
import app.utils.image_editor as image_editor
from PyQt6.QtCore import QObject, pyqtSignal
import pydicom
from pydicom.uid import generate_uid
from datetime import datetime
class ExportCategory:
A = (180, 0, 0)
B = (0, 180, 0)
C = (0, 0, 180)
D = (180, 180, 0)
E = (180, 0, 180)
F = (0, 180, 180)
G = (120, 60, 0)
H = (60, 120, 0)
I = (0, 120, 60)
J = (120, 0, 60)
# Noms lisibles pour les catégories
CATEGORY_NAMES = {
A: "Rouge",
B: "Vert",
C: "Bleu",
D: "Jaune",
E: "Magenta",
F: "Cyan",
G: "Marron",
H: "Olive",
I: "Teal",
J: "Bordeaux"
}
@classmethod
def list(cls):
return [cls.A, cls.B, cls.C, cls.D, cls.E, cls.F, cls.G, cls.H, cls.I, cls.J]
@classmethod
def get_name(cls, category):
"""Get readable name for a category"""
return cls.CATEGORY_NAMES.get(category, f"Groupe_{category}")
class ExportManager(QObject):
# Signaux pour le suivi de progression
progress_changed = pyqtSignal(int)
phase_changed = pyqtSignal(str)
export_finished = pyqtSignal(bool) # True = success, False = error
def __init__(self, dicom_manager):
super().__init__()
self.export_data = [] # Placeholder for data to export
self.dicom_manager = dicom_manager
self.export_destination = None # Destination folder for exports
def set_export_data(self, data):
self.export_data = data
def set_export_destination(self, destination_path):
"""Set the destination folder for exports"""
self.export_destination = destination_path
def get_export_folder_name(self, root_name, category):
"""Generate export folder name with readable category name"""
category_name = ExportCategory.get_name(category)
return f"{root_name}_{category_name}"
def get_export_dicom_list(self):
result = []
for parent in self.dicom_manager.get_dicoms():
if parent is None or parent.is_empty(): # Skip None or empty entries
continue
subfolders = parent.get_subfolders()
if subfolders:
result.append([parent, subfolders])
else:
# For single files without subfolders, create a list with the parent itself
result.append([parent, [parent]])
return result
def fetch_export_data(self):
result = []
for parent in self.dicom_manager.get_dicoms():
if parent is None or parent.is_empty(): # Skip None or empty entries
continue
subfolders = parent.get_subfolders()
# Handle case where there are no subfolders (single file DICOMs)
if not subfolders:
if parent.get_export(): # Check if the parent itself is marked for export
result.append([parent, [parent]])
continue
# Filter only checked subfolders
checked_subfolders = [subfolder for subfolder in subfolders if subfolder.get_export()]
if not checked_subfolders:
continue
# Group subfolders by category
categories = {}
for subfolder in checked_subfolders:
category = subfolder.get_category()
if category not in categories:
categories[category] = []
categories[category].append(subfolder)
# Create separate lists for each category
for category, category_subfolders in categories.items():
result.append([parent, category_subfolders])
return result
def export_images_as_pdf(self):
"""Export images as PDF with progress tracking"""
self.phase_changed.emit("preparing_export")
data = self.fetch_export_data()
if not data:
self.export_finished.emit(False)
return False
try:
total_files = 0
# Count total files for progress calculation
for liste in data:
subfolders = liste[1]
for subfolder in subfolders:
files = subfolder.get_files()
total_files += len(files)
if total_files == 0:
self.export_finished.emit(False)
return False
self.phase_changed.emit("exporting_pdf")
processed_files = 0
for liste in data:
root = liste[0]
subfolders = liste[1]
# Use readable category name and destination
folder_name = self.get_export_folder_name(root.get_name(), subfolders[0].get_category())
base_path = self.export_destination if self.export_destination else os.getcwd()
pdf_path = os.path.join(base_path, f"{folder_name}.pdf")
c = canvas.Canvas(pdf_path)
first_image = True
for subfolder in subfolders:
files = subfolder.get_files()
mask = subfolder.get_mask()
if not files:
continue
for file in files:
image_array = file.get_image()
if image_array is not None:
# Apply mask if it exists
if mask is not None:
image_array = image_editor.apply_mask(image_array, mask)
# Convert numpy array to PIL Image
if isinstance(image_array, np.ndarray):
if not first_image:
c.showPage()
first_image = False
pil_image = Image.fromarray(image_array.astype('uint8'))
img_width, img_height = pil_image.size
c.setPageSize((img_width, img_height))
c.drawInlineImage(pil_image, 0, 0, width=img_width, height=img_height)
processed_files += 1
progress = int((processed_files / total_files) * 100)
self.progress_changed.emit(progress)
c.save()
self.phase_changed.emit("export_complete")
self.export_finished.emit(True)
return True
except Exception as e:
self.export_finished.emit(False)
return False
def export_images_as_png(self):
"""Export images as PNG with progress tracking"""
self.phase_changed.emit("preparing_export")
data = self.fetch_export_data()
if not data:
self.export_finished.emit(False)
return False
try:
total_files = 0
# Count total files for progress calculation
for liste in data:
subfolders = liste[1]
for subfolder in subfolders:
files = subfolder.get_files()
total_files += len(files)
if total_files == 0:
self.export_finished.emit(False)
return False
self.phase_changed.emit("exporting_png")
processed_files = 0
for liste in data:
root = liste[0]
subfolders = liste[1]
# Use readable category name and destination
folder_name = self.get_export_folder_name(root.get_name(), subfolders[0].get_category())
base_path = self.export_destination if self.export_destination else os.getcwd()
png_folder = os.path.join(base_path, folder_name)
os.makedirs(png_folder, exist_ok=True)
for subfolder in subfolders:
files = subfolder.get_files()
mask = subfolder.get_mask()
if not files:
continue
for file in files:
image_array = file.get_image()
if image_array is not None:
# Apply mask if it exists
if mask is not None:
image_array = image_editor.apply_mask(image_array, mask)
# Convert numpy array to PIL Image
if isinstance(image_array, np.ndarray):
pil_image = Image.fromarray(image_array.astype('uint8'))
# Keep original filename
original_name = os.path.splitext(file.get_name())[0]
png_path = os.path.join(png_folder, f"{original_name}.png")
pil_image.save(png_path)
processed_files += 1
progress = int((processed_files / total_files) * 100)
self.progress_changed.emit(progress)
self.phase_changed.emit("export_complete")
self.export_finished.emit(True)
return True
except Exception as e:
self.export_finished.emit(False)
return False
def export_metadata_as_json(self):
"""Export metadata as JSON with progress tracking"""
self.phase_changed.emit("preparing_export")
data = self.fetch_export_data()
if not data:
self.export_finished.emit(False)
return False
try:
total_subfolders = sum(len(liste[1]) for liste in data)
if total_subfolders == 0:
self.export_finished.emit(False)
return False
self.phase_changed.emit("exporting_json")
processed_subfolders = 0
for liste in data:
root = liste[0]
subfolders = liste[1]
# Use readable category name and destination
folder_name = self.get_export_folder_name(root.get_name(), subfolders[0].get_category())
base_path = self.export_destination if self.export_destination else os.getcwd()
json_folder = os.path.join(base_path, folder_name)
os.makedirs(json_folder, exist_ok=True)
for subfolder in subfolders:
try:
subfolder.export_metadata_json(json_folder)
except Exception as e:
# Continue with other subfolders even if one fails
continue
processed_subfolders += 1
progress = int((processed_subfolders / total_subfolders) * 100)
self.progress_changed.emit(progress)
self.phase_changed.emit("export_complete")
self.export_finished.emit(True)
return True
except Exception as e:
self.export_finished.emit(False)
return False
def export_metadata_as_xls(self):
"""Export metadata as XLS with progress tracking"""
self.phase_changed.emit("preparing_export")
data = self.fetch_export_data()
if not data:
self.export_finished.emit(False)
return False
try:
total_subfolders = sum(len(liste[1]) for liste in data)
if total_subfolders == 0:
self.export_finished.emit(False)
return False
self.phase_changed.emit("exporting_xls")
processed_subfolders = 0
for liste in data:
root = liste[0]
subfolders = liste[1]
# Use readable category name and destination
folder_name = self.get_export_folder_name(root.get_name(), subfolders[0].get_category())
base_path = self.export_destination if self.export_destination else os.getcwd()
xls_folder = os.path.join(base_path, folder_name)
os.makedirs(xls_folder, exist_ok=True)
for subfolder in subfolders:
try:
subfolder.export_metadata_xls(xls_folder)
except Exception as e:
# Continue with other subfolders even if one fails
continue
processed_subfolders += 1
progress = int((processed_subfolders / total_subfolders) * 100)
self.progress_changed.emit(progress)
self.phase_changed.emit("export_complete")
self.export_finished.emit(True)
return True
except Exception as e:
self.export_finished.emit(False)
return False
def export_images_as_dicomdir(self):
"""Export images as DICOMDIR with progress tracking"""
self.phase_changed.emit("preparing_export")
data = self.fetch_export_data()
if not data:
self.export_finished.emit(False)
return False
try:
total_files = 0
# Count total files for progress calculation
for liste in data:
subfolders = liste[1]
for subfolder in subfolders:
files = subfolder.get_files()
total_files += len(files)
if total_files == 0:
self.export_finished.emit(False)
return False
self.phase_changed.emit("exporting_dicomdir")
processed_files = 0
for liste in data:
root = liste[0]
subfolders = liste[1]
# Use readable category name and destination
folder_name = self.get_export_folder_name(root.get_name(), subfolders[0].get_category())
base_path = self.export_destination if self.export_destination else os.getcwd()
dicom_folder = os.path.join(base_path, f"{folder_name}_DICOMDIR")
os.makedirs(dicom_folder, exist_ok=True)
# Create DICOM directory structure with proper hierarchy
study_uid = generate_uid()
for i, subfolder in enumerate(subfolders):
files = subfolder.get_files()
mask = subfolder.get_mask()
if not files:
continue
# Create series folder with original subfolder name
series_uid = generate_uid()
series_folder = os.path.join(dicom_folder, subfolder.get_name())
os.makedirs(series_folder, exist_ok=True)
for j, file in enumerate(files):
try:
# Get original DICOM dataset
if not file.ds:
continue
# Create a deep copy of the dataset
new_ds = pydicom.dcmread(file.file_path)
# Apply mask to pixel data if mask exists
if mask is not None:
image_array = file.get_image()
if image_array is not None:
# Apply mask
masked_array = image_editor.apply_mask(image_array, mask)
# Convert RGB back to grayscale for DICOM storage
if len(masked_array.shape) == 3:
# Convert RGB to grayscale using standard weights
grayscale = np.dot(masked_array[...,:3], [0.2989, 0.5870, 0.1140]).astype(np.uint16)
else:
grayscale = masked_array.astype(np.uint16)
# Update pixel data and related attributes
new_ds.PixelData = grayscale.tobytes()
new_ds.Rows = int(grayscale.shape[0])
new_ds.Columns = int(grayscale.shape[1])
new_ds.BitsAllocated = 16
new_ds.BitsStored = 16
new_ds.HighBit = 15
new_ds.SamplesPerPixel = 1
new_ds.PhotometricInterpretation = 'MONOCHROME2'
# Update required DICOM attributes for DICOMDIR
new_ds.StudyInstanceUID = study_uid
new_ds.SeriesInstanceUID = series_uid
new_ds.SOPInstanceUID = generate_uid()
new_ds.InstanceNumber = str(j + 1)
new_ds.SeriesNumber = str(i + 1)
# Ensure required attributes exist
if not hasattr(new_ds, 'StudyID'):
new_ds.StudyID = '1'
if not hasattr(new_ds, 'SeriesDescription'):
new_ds.SeriesDescription = subfolder.get_name()
if not hasattr(new_ds, 'StudyDescription'):
new_ds.StudyDescription = root.get_name()
if not hasattr(new_ds, 'PatientName'):
new_ds.PatientName = 'Anonymous'
if not hasattr(new_ds, 'PatientID'):
new_ds.PatientID = 'ANON001'
if not hasattr(new_ds, 'StudyDate'):
new_ds.StudyDate = datetime.now().strftime('%Y%m%d')
if not hasattr(new_ds, 'StudyTime'):
new_ds.StudyTime = datetime.now().strftime('%H%M%S')
# Save modified DICOM file with original name
original_name = os.path.splitext(file.get_name())[0]
output_path = os.path.join(series_folder, original_name)
# Save without file extension for DICOMDIR compatibility
new_ds.save_as(output_path, write_like_original=False)
except Exception as e:
continue
processed_files += 1
progress = int((processed_files / total_files) * 90)
self.progress_changed.emit(progress)
# Create DICOMDIR file
self.progress_changed.emit(95)
success = self._create_dicomdir_file(dicom_folder)
if not success:
# If DICOMDIR creation fails, still consider export successful
# but create a simple index file
self._create_simple_index(dicom_folder)
self.phase_changed.emit("export_complete")
self.export_finished.emit(True)
return True
except Exception as e:
self.export_finished.emit(False)
return False
def _create_dicomdir_file(self, dicom_folder):
"""Create a proper DICOMDIR file"""
try:
# Create DICOMDIR manually since pydicom's FileSet might have issues
from pydicom.dataset import Dataset, FileDataset
from pydicom import uid
# Create the DICOMDIR dataset
file_meta = Dataset()
file_meta.MediaStorageSOPClassUID = uid.MediaStorageDirectoryStorage
file_meta.MediaStorageSOPInstanceUID = generate_uid()
file_meta.ImplementationClassUID = generate_uid()
file_meta.TransferSyntaxUID = uid.ExplicitVRLittleEndian
# Create main dataset
ds = FileDataset('DICOMDIR', {}, file_meta=file_meta, preamble=b"\0" * 128)
ds.FileSetID = 'EXPORTED_STUDY'
ds.DirectoryRecordSequence = []
# Add directory records for each DICOM file
for root, dirs, files in os.walk(dicom_folder):
for file in files:
if not file.startswith('IM'): # Only process our DICOM files
continue
file_path = os.path.join(root, file)
try:
# Read the DICOM file to get metadata
file_ds = pydicom.dcmread(file_path)
# Create directory record
record = Dataset()
record.DirectoryRecordType = 'IMAGE'
# Set file reference
rel_path = os.path.relpath(file_path, dicom_folder)
# Convert path separators for DICOM
rel_path_parts = rel_path.replace('\\', '/').split('/')
record.ReferencedFileID = rel_path_parts
# Add essential metadata
if hasattr(file_ds, 'StudyInstanceUID'):
record.StudyInstanceUID = file_ds.StudyInstanceUID
if hasattr(file_ds, 'SeriesInstanceUID'):
record.SeriesInstanceUID = file_ds.SeriesInstanceUID
if hasattr(file_ds, 'SOPInstanceUID'):
record.ReferencedSOPInstanceUIDInFile = file_ds.SOPInstanceUID
if hasattr(file_ds, 'InstanceNumber'):
record.InstanceNumber = file_ds.InstanceNumber
ds.DirectoryRecordSequence.append(record)
except Exception as e:
continue
# Save DICOMDIR
dicomdir_path = os.path.join(dicom_folder, 'DICOMDIR')
ds.save_as(dicomdir_path, write_like_original=False)
return True
except Exception as e:
return False
def _create_simple_index(self, dicom_folder):
"""Create a simple text index if DICOMDIR creation fails"""
try:
index_path = os.path.join(dicom_folder, 'INDEX.txt')
with open(index_path, 'w') as f:
f.write("DICOM Export Index\n")
f.write("==================\n\n")
for root, dirs, files in os.walk(dicom_folder):
for file in files:
if file.startswith('IM'):
rel_path = os.path.relpath(os.path.join(root, file), dicom_folder)
f.write(f"{rel_path}\n")
except:
pass
# Write DICOMDIR
dicomdir_path = os.path.join(dicom_folder, 'DICOMDIR')
f.write(dicomdir_path)