generated from LouisMazin/PythonApplicationTemplate
568 lines
24 KiB
Python
568 lines
24 KiB
Python
import os
|
|
from reportlab.pdfgen import canvas
|
|
from reportlab.lib.pagesizes import A4
|
|
import numpy as np
|
|
from PIL import Image
|
|
import app.utils.image_editor as image_editor
|
|
from PyQt6.QtCore import QObject, pyqtSignal
|
|
import pydicom
|
|
from pydicom.uid import generate_uid
|
|
from datetime import datetime
|
|
|
|
class ExportCategory:
|
|
A = (180, 0, 0)
|
|
B = (0, 180, 0)
|
|
C = (0, 0, 180)
|
|
D = (180, 180, 0)
|
|
E = (180, 0, 180)
|
|
F = (0, 180, 180)
|
|
G = (120, 60, 0)
|
|
H = (60, 120, 0)
|
|
I = (0, 120, 60)
|
|
J = (120, 0, 60)
|
|
|
|
# Noms lisibles pour les catégories
|
|
CATEGORY_NAMES = {
|
|
A: "Rouge",
|
|
B: "Vert",
|
|
C: "Bleu",
|
|
D: "Jaune",
|
|
E: "Magenta",
|
|
F: "Cyan",
|
|
G: "Marron",
|
|
H: "Olive",
|
|
I: "Teal",
|
|
J: "Bordeaux"
|
|
}
|
|
|
|
@classmethod
|
|
def list(cls):
|
|
return [cls.A, cls.B, cls.C, cls.D, cls.E, cls.F, cls.G, cls.H, cls.I, cls.J]
|
|
|
|
@classmethod
|
|
def get_name(cls, category):
|
|
"""Get readable name for a category"""
|
|
return cls.CATEGORY_NAMES.get(category, f"Groupe_{category}")
|
|
|
|
class ExportManager(QObject):
|
|
# Signaux pour le suivi de progression
|
|
progress_changed = pyqtSignal(int)
|
|
phase_changed = pyqtSignal(str)
|
|
export_finished = pyqtSignal(bool) # True = success, False = error
|
|
|
|
def __init__(self, dicom_manager):
|
|
super().__init__()
|
|
self.export_data = [] # Placeholder for data to export
|
|
self.dicom_manager = dicom_manager
|
|
self.export_destination = None # Destination folder for exports
|
|
|
|
def set_export_data(self, data):
|
|
self.export_data = data
|
|
|
|
def set_export_destination(self, destination_path):
|
|
"""Set the destination folder for exports"""
|
|
self.export_destination = destination_path
|
|
|
|
def get_export_folder_name(self, root_name, category):
|
|
"""Generate export folder name with readable category name"""
|
|
category_name = ExportCategory.get_name(category)
|
|
return f"{root_name}_{category_name}"
|
|
|
|
def get_export_dicom_list(self):
|
|
result = []
|
|
for parent in self.dicom_manager.get_dicoms():
|
|
if parent is None or parent.is_empty(): # Skip None or empty entries
|
|
continue
|
|
|
|
subfolders = parent.get_subfolders()
|
|
if subfolders:
|
|
result.append([parent, subfolders])
|
|
else:
|
|
# For single files without subfolders, create a list with the parent itself
|
|
result.append([parent, [parent]])
|
|
return result
|
|
|
|
def fetch_export_data(self):
|
|
result = []
|
|
for parent in self.dicom_manager.get_dicoms():
|
|
if parent is None or parent.is_empty(): # Skip None or empty entries
|
|
continue
|
|
|
|
subfolders = parent.get_subfolders()
|
|
|
|
# Handle case where there are no subfolders (single file DICOMs)
|
|
if not subfolders:
|
|
if parent.get_export(): # Check if the parent itself is marked for export
|
|
result.append([parent, [parent]])
|
|
continue
|
|
|
|
# Filter only checked subfolders
|
|
checked_subfolders = [subfolder for subfolder in subfolders if subfolder.get_export()]
|
|
|
|
if not checked_subfolders:
|
|
continue
|
|
|
|
# Group subfolders by category
|
|
categories = {}
|
|
for subfolder in checked_subfolders:
|
|
category = subfolder.get_category()
|
|
if category not in categories:
|
|
categories[category] = []
|
|
categories[category].append(subfolder)
|
|
|
|
# Create separate lists for each category
|
|
for category, category_subfolders in categories.items():
|
|
result.append([parent, category_subfolders])
|
|
return result
|
|
|
|
def export_images_as_pdf(self):
|
|
"""Export images as PDF with progress tracking"""
|
|
self.phase_changed.emit("preparing_export")
|
|
data = self.fetch_export_data()
|
|
if not data:
|
|
self.export_finished.emit(False)
|
|
return False
|
|
|
|
try:
|
|
total_files = 0
|
|
# Count total files for progress calculation
|
|
for liste in data:
|
|
subfolders = liste[1]
|
|
for subfolder in subfolders:
|
|
files = subfolder.get_files()
|
|
total_files += len(files)
|
|
|
|
if total_files == 0:
|
|
self.export_finished.emit(False)
|
|
return False
|
|
|
|
self.phase_changed.emit("exporting_pdf")
|
|
processed_files = 0
|
|
|
|
for liste in data:
|
|
root = liste[0]
|
|
subfolders = liste[1]
|
|
|
|
# Use readable category name and destination
|
|
folder_name = self.get_export_folder_name(root.get_name(), subfolders[0].get_category())
|
|
base_path = self.export_destination if self.export_destination else os.getcwd()
|
|
pdf_path = os.path.join(base_path, f"{folder_name}.pdf")
|
|
c = canvas.Canvas(pdf_path)
|
|
first_image = True
|
|
|
|
for subfolder in subfolders:
|
|
files = subfolder.get_files()
|
|
mask = subfolder.get_mask()
|
|
if not files:
|
|
continue
|
|
|
|
for file in files:
|
|
image_array = file.get_image()
|
|
if image_array is not None:
|
|
# Apply mask if it exists
|
|
if mask is not None:
|
|
image_array = image_editor.apply_mask(image_array, mask)
|
|
|
|
# Convert numpy array to PIL Image
|
|
if isinstance(image_array, np.ndarray):
|
|
if not first_image:
|
|
c.showPage()
|
|
first_image = False
|
|
|
|
pil_image = Image.fromarray(image_array.astype('uint8'))
|
|
img_width, img_height = pil_image.size
|
|
c.setPageSize((img_width, img_height))
|
|
c.drawInlineImage(pil_image, 0, 0, width=img_width, height=img_height)
|
|
|
|
processed_files += 1
|
|
progress = int((processed_files / total_files) * 100)
|
|
self.progress_changed.emit(progress)
|
|
|
|
c.save()
|
|
|
|
self.phase_changed.emit("export_complete")
|
|
self.export_finished.emit(True)
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.export_finished.emit(False)
|
|
return False
|
|
|
|
def export_images_as_png(self):
|
|
"""Export images as PNG with progress tracking"""
|
|
self.phase_changed.emit("preparing_export")
|
|
data = self.fetch_export_data()
|
|
if not data:
|
|
self.export_finished.emit(False)
|
|
return False
|
|
|
|
try:
|
|
total_files = 0
|
|
# Count total files for progress calculation
|
|
for liste in data:
|
|
subfolders = liste[1]
|
|
for subfolder in subfolders:
|
|
files = subfolder.get_files()
|
|
total_files += len(files)
|
|
|
|
if total_files == 0:
|
|
self.export_finished.emit(False)
|
|
return False
|
|
|
|
self.phase_changed.emit("exporting_png")
|
|
processed_files = 0
|
|
|
|
for liste in data:
|
|
root = liste[0]
|
|
subfolders = liste[1]
|
|
|
|
# Use readable category name and destination
|
|
folder_name = self.get_export_folder_name(root.get_name(), subfolders[0].get_category())
|
|
base_path = self.export_destination if self.export_destination else os.getcwd()
|
|
png_folder = os.path.join(base_path, folder_name)
|
|
os.makedirs(png_folder, exist_ok=True)
|
|
|
|
for subfolder in subfolders:
|
|
files = subfolder.get_files()
|
|
mask = subfolder.get_mask()
|
|
if not files:
|
|
continue
|
|
|
|
for file in files:
|
|
image_array = file.get_image()
|
|
if image_array is not None:
|
|
# Apply mask if it exists
|
|
if mask is not None:
|
|
image_array = image_editor.apply_mask(image_array, mask)
|
|
|
|
# Convert numpy array to PIL Image
|
|
if isinstance(image_array, np.ndarray):
|
|
pil_image = Image.fromarray(image_array.astype('uint8'))
|
|
# Keep original filename
|
|
original_name = os.path.splitext(file.get_name())[0]
|
|
png_path = os.path.join(png_folder, f"{original_name}.png")
|
|
pil_image.save(png_path)
|
|
|
|
processed_files += 1
|
|
progress = int((processed_files / total_files) * 100)
|
|
self.progress_changed.emit(progress)
|
|
|
|
self.phase_changed.emit("export_complete")
|
|
self.export_finished.emit(True)
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.export_finished.emit(False)
|
|
return False
|
|
|
|
def export_metadata_as_json(self):
|
|
"""Export metadata as JSON with progress tracking"""
|
|
self.phase_changed.emit("preparing_export")
|
|
data = self.fetch_export_data()
|
|
if not data:
|
|
self.export_finished.emit(False)
|
|
return False
|
|
|
|
try:
|
|
total_subfolders = sum(len(liste[1]) for liste in data)
|
|
if total_subfolders == 0:
|
|
self.export_finished.emit(False)
|
|
return False
|
|
|
|
self.phase_changed.emit("exporting_json")
|
|
processed_subfolders = 0
|
|
|
|
for liste in data:
|
|
root = liste[0]
|
|
subfolders = liste[1]
|
|
|
|
# Use readable category name and destination
|
|
folder_name = self.get_export_folder_name(root.get_name(), subfolders[0].get_category())
|
|
base_path = self.export_destination if self.export_destination else os.getcwd()
|
|
json_folder = os.path.join(base_path, folder_name)
|
|
os.makedirs(json_folder, exist_ok=True)
|
|
|
|
for subfolder in subfolders:
|
|
try:
|
|
subfolder.export_metadata_json(json_folder)
|
|
except Exception as e:
|
|
# Continue with other subfolders even if one fails
|
|
continue
|
|
|
|
processed_subfolders += 1
|
|
progress = int((processed_subfolders / total_subfolders) * 100)
|
|
self.progress_changed.emit(progress)
|
|
|
|
self.phase_changed.emit("export_complete")
|
|
self.export_finished.emit(True)
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.export_finished.emit(False)
|
|
return False
|
|
|
|
def export_metadata_as_xls(self):
|
|
"""Export metadata as XLS with progress tracking"""
|
|
self.phase_changed.emit("preparing_export")
|
|
data = self.fetch_export_data()
|
|
if not data:
|
|
self.export_finished.emit(False)
|
|
return False
|
|
|
|
try:
|
|
total_subfolders = sum(len(liste[1]) for liste in data)
|
|
if total_subfolders == 0:
|
|
self.export_finished.emit(False)
|
|
return False
|
|
|
|
self.phase_changed.emit("exporting_xls")
|
|
processed_subfolders = 0
|
|
|
|
for liste in data:
|
|
root = liste[0]
|
|
subfolders = liste[1]
|
|
|
|
# Use readable category name and destination
|
|
folder_name = self.get_export_folder_name(root.get_name(), subfolders[0].get_category())
|
|
base_path = self.export_destination if self.export_destination else os.getcwd()
|
|
xls_folder = os.path.join(base_path, folder_name)
|
|
os.makedirs(xls_folder, exist_ok=True)
|
|
|
|
for subfolder in subfolders:
|
|
try:
|
|
subfolder.export_metadata_xls(xls_folder)
|
|
except Exception as e:
|
|
# Continue with other subfolders even if one fails
|
|
continue
|
|
|
|
processed_subfolders += 1
|
|
progress = int((processed_subfolders / total_subfolders) * 100)
|
|
self.progress_changed.emit(progress)
|
|
|
|
self.phase_changed.emit("export_complete")
|
|
self.export_finished.emit(True)
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.export_finished.emit(False)
|
|
return False
|
|
|
|
def export_images_as_dicomdir(self):
|
|
"""Export images as DICOMDIR with progress tracking"""
|
|
self.phase_changed.emit("preparing_export")
|
|
data = self.fetch_export_data()
|
|
if not data:
|
|
self.export_finished.emit(False)
|
|
return False
|
|
|
|
try:
|
|
total_files = 0
|
|
# Count total files for progress calculation
|
|
for liste in data:
|
|
subfolders = liste[1]
|
|
for subfolder in subfolders:
|
|
files = subfolder.get_files()
|
|
total_files += len(files)
|
|
|
|
if total_files == 0:
|
|
self.export_finished.emit(False)
|
|
return False
|
|
|
|
self.phase_changed.emit("exporting_dicomdir")
|
|
processed_files = 0
|
|
|
|
for liste in data:
|
|
root = liste[0]
|
|
subfolders = liste[1]
|
|
|
|
# Use readable category name and destination
|
|
folder_name = self.get_export_folder_name(root.get_name(), subfolders[0].get_category())
|
|
base_path = self.export_destination if self.export_destination else os.getcwd()
|
|
dicom_folder = os.path.join(base_path, f"{folder_name}_DICOMDIR")
|
|
os.makedirs(dicom_folder, exist_ok=True)
|
|
|
|
# Create DICOM directory structure with proper hierarchy
|
|
study_uid = generate_uid()
|
|
|
|
for i, subfolder in enumerate(subfolders):
|
|
files = subfolder.get_files()
|
|
mask = subfolder.get_mask()
|
|
if not files:
|
|
continue
|
|
|
|
# Create series folder with original subfolder name
|
|
series_uid = generate_uid()
|
|
series_folder = os.path.join(dicom_folder, subfolder.get_name())
|
|
os.makedirs(series_folder, exist_ok=True)
|
|
|
|
for j, file in enumerate(files):
|
|
try:
|
|
# Get original DICOM dataset
|
|
if not file.ds:
|
|
continue
|
|
|
|
# Create a deep copy of the dataset
|
|
new_ds = pydicom.dcmread(file.file_path)
|
|
|
|
# Apply mask to pixel data if mask exists
|
|
if mask is not None:
|
|
image_array = file.get_image()
|
|
if image_array is not None:
|
|
# Apply mask
|
|
masked_array = image_editor.apply_mask(image_array, mask)
|
|
|
|
# Convert RGB back to grayscale for DICOM storage
|
|
if len(masked_array.shape) == 3:
|
|
# Convert RGB to grayscale using standard weights
|
|
grayscale = np.dot(masked_array[...,:3], [0.2989, 0.5870, 0.1140]).astype(np.uint16)
|
|
else:
|
|
grayscale = masked_array.astype(np.uint16)
|
|
|
|
# Update pixel data and related attributes
|
|
new_ds.PixelData = grayscale.tobytes()
|
|
new_ds.Rows = int(grayscale.shape[0])
|
|
new_ds.Columns = int(grayscale.shape[1])
|
|
new_ds.BitsAllocated = 16
|
|
new_ds.BitsStored = 16
|
|
new_ds.HighBit = 15
|
|
new_ds.SamplesPerPixel = 1
|
|
new_ds.PhotometricInterpretation = 'MONOCHROME2'
|
|
|
|
# Update required DICOM attributes for DICOMDIR
|
|
new_ds.StudyInstanceUID = study_uid
|
|
new_ds.SeriesInstanceUID = series_uid
|
|
new_ds.SOPInstanceUID = generate_uid()
|
|
new_ds.InstanceNumber = str(j + 1)
|
|
new_ds.SeriesNumber = str(i + 1)
|
|
|
|
# Ensure required attributes exist
|
|
if not hasattr(new_ds, 'StudyID'):
|
|
new_ds.StudyID = '1'
|
|
if not hasattr(new_ds, 'SeriesDescription'):
|
|
new_ds.SeriesDescription = subfolder.get_name()
|
|
if not hasattr(new_ds, 'StudyDescription'):
|
|
new_ds.StudyDescription = root.get_name()
|
|
if not hasattr(new_ds, 'PatientName'):
|
|
new_ds.PatientName = 'Anonymous'
|
|
if not hasattr(new_ds, 'PatientID'):
|
|
new_ds.PatientID = 'ANON001'
|
|
if not hasattr(new_ds, 'StudyDate'):
|
|
new_ds.StudyDate = datetime.now().strftime('%Y%m%d')
|
|
if not hasattr(new_ds, 'StudyTime'):
|
|
new_ds.StudyTime = datetime.now().strftime('%H%M%S')
|
|
|
|
|
|
# Save modified DICOM file with original name
|
|
original_name = os.path.splitext(file.get_name())[0]
|
|
output_path = os.path.join(series_folder, original_name)
|
|
|
|
# Save without file extension for DICOMDIR compatibility
|
|
new_ds.save_as(output_path, write_like_original=False)
|
|
|
|
except Exception as e:
|
|
continue
|
|
|
|
processed_files += 1
|
|
progress = int((processed_files / total_files) * 90)
|
|
self.progress_changed.emit(progress)
|
|
|
|
# Create DICOMDIR file
|
|
self.progress_changed.emit(95)
|
|
success = self._create_dicomdir_file(dicom_folder)
|
|
if not success:
|
|
# If DICOMDIR creation fails, still consider export successful
|
|
# but create a simple index file
|
|
self._create_simple_index(dicom_folder)
|
|
|
|
self.phase_changed.emit("export_complete")
|
|
self.export_finished.emit(True)
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.export_finished.emit(False)
|
|
return False
|
|
|
|
def _create_dicomdir_file(self, dicom_folder):
|
|
"""Create a proper DICOMDIR file"""
|
|
try:
|
|
# Create DICOMDIR manually since pydicom's FileSet might have issues
|
|
from pydicom.dataset import Dataset, FileDataset
|
|
from pydicom import uid
|
|
|
|
# Create the DICOMDIR dataset
|
|
file_meta = Dataset()
|
|
file_meta.MediaStorageSOPClassUID = uid.MediaStorageDirectoryStorage
|
|
file_meta.MediaStorageSOPInstanceUID = generate_uid()
|
|
file_meta.ImplementationClassUID = generate_uid()
|
|
file_meta.TransferSyntaxUID = uid.ExplicitVRLittleEndian
|
|
|
|
# Create main dataset
|
|
ds = FileDataset('DICOMDIR', {}, file_meta=file_meta, preamble=b"\0" * 128)
|
|
ds.FileSetID = 'EXPORTED_STUDY'
|
|
ds.DirectoryRecordSequence = []
|
|
|
|
# Add directory records for each DICOM file
|
|
for root, dirs, files in os.walk(dicom_folder):
|
|
for file in files:
|
|
if not file.startswith('IM'): # Only process our DICOM files
|
|
continue
|
|
|
|
file_path = os.path.join(root, file)
|
|
try:
|
|
# Read the DICOM file to get metadata
|
|
file_ds = pydicom.dcmread(file_path)
|
|
|
|
# Create directory record
|
|
record = Dataset()
|
|
record.DirectoryRecordType = 'IMAGE'
|
|
|
|
# Set file reference
|
|
rel_path = os.path.relpath(file_path, dicom_folder)
|
|
# Convert path separators for DICOM
|
|
rel_path_parts = rel_path.replace('\\', '/').split('/')
|
|
record.ReferencedFileID = rel_path_parts
|
|
|
|
# Add essential metadata
|
|
if hasattr(file_ds, 'StudyInstanceUID'):
|
|
record.StudyInstanceUID = file_ds.StudyInstanceUID
|
|
if hasattr(file_ds, 'SeriesInstanceUID'):
|
|
record.SeriesInstanceUID = file_ds.SeriesInstanceUID
|
|
if hasattr(file_ds, 'SOPInstanceUID'):
|
|
record.ReferencedSOPInstanceUIDInFile = file_ds.SOPInstanceUID
|
|
if hasattr(file_ds, 'InstanceNumber'):
|
|
record.InstanceNumber = file_ds.InstanceNumber
|
|
|
|
ds.DirectoryRecordSequence.append(record)
|
|
|
|
except Exception as e:
|
|
continue
|
|
|
|
# Save DICOMDIR
|
|
dicomdir_path = os.path.join(dicom_folder, 'DICOMDIR')
|
|
ds.save_as(dicomdir_path, write_like_original=False)
|
|
return True
|
|
|
|
except Exception as e:
|
|
return False
|
|
|
|
def _create_simple_index(self, dicom_folder):
|
|
"""Create a simple text index if DICOMDIR creation fails"""
|
|
try:
|
|
index_path = os.path.join(dicom_folder, 'INDEX.txt')
|
|
with open(index_path, 'w') as f:
|
|
f.write("DICOM Export Index\n")
|
|
f.write("==================\n\n")
|
|
|
|
for root, dirs, files in os.walk(dicom_folder):
|
|
for file in files:
|
|
if file.startswith('IM'):
|
|
rel_path = os.path.relpath(os.path.join(root, file), dicom_folder)
|
|
f.write(f"{rel_path}\n")
|
|
except:
|
|
pass
|
|
|
|
# Write DICOMDIR
|
|
dicomdir_path = os.path.join(dicom_folder, 'DICOMDIR')
|
|
f.write(dicomdir_path)
|
|
|