2024-08-13 给RD.dcm添加DVH Sequence (2)

# Rays_export_plans_dvh.py
# 这个代码是在RS9A里运行的,用来导出pat_id.csv里面患者列表的plan dose和DVH
# pat_id.csv里面的第一行是‘patient_id’, 从第二行每行是id
# 目前的bin_size = 1.0 cGy

import os
import math
import json
import csv
import zipfile
import shutil
from connect import *

# DVH导出功能
def export_dvh_data(plan, output_file):
    bin_size = 1  # [cGy]

    # 获取计划剂量
    if platform.python_implementation() == 'IronPython':
        plan_dose = [d for d in plan.TreatmentCourse.TotalDose.DoseValues.DoseData]
    else:
        plan_dose = plan.TreatmentCourse.TotalDose.DoseValues.DoseData.flatten()

    # 计算n_bins,并确保包含0 cGy
    n_bins = int(math.ceil(max(plan_dose) / bin_size)) + 1
    dose_values = [0] + [bin_size * i for i in range(1, n_bins)]

    # 保存所有ROI的DVH数据到同一个txt文件
    with open(output_file, 'w') as f:
        structure_set = plan.GetStructureSet()
        roi_names = [r.OfRoi.Name for r in structure_set.RoiGeometries if r.PrimaryShape is not None]
        plan.TreatmentCourse.TotalDose.UpdateDoseGridStructures()

        for roi in roi_names:
            # 获取指定剂量区间下的累积相对体积
            bin_rel_vol = plan.TreatmentCourse.TotalDose.GetRelativeVolumeAtDoseValues(RoiName=roi, DoseValues=dose_values)

            # 获取ROI的总体积
            roi_volume = plan.TreatmentCourse.TotalDose.GetDoseGridRoi(RoiName=roi).RoiVolumeDistribution.TotalVolume

            # 计算累积体积 (cm³)
            cumulative_dvh = [volume * roi_volume for volume in bin_rel_vol]
            
            dgr = plan.TreatmentCourse.TotalDose.GetDoseGridRoi(RoiName=roi)
            roi_dose = [plan_dose[vi] for vi in dgr.RoiVolumeDistribution.VoxelIndices]
            
            # 计算DVH的最小剂量、最大剂量和平均剂量
            dvh_minimum_dose = min(roi_dose) / 100.0  # 转换为 Gy
            dvh_maximum_dose = max(roi_dose) / 100.0  # 转换为 Gy
            dvh_mean_dose = sum(roi_dose) / len(roi_dose) / 100.0  # 平均剂量,转换为 Gy

            # 写入该ROI的DVH头部信息
            f.write("ROI Name: {}\n".format(roi))
            f.write("DVH Type: CUMULATIVE\n")
            f.write("Dose Units: GY\n")
            f.write("Dose Type: PHYSICAL\n")
            f.write("DVH Dose Scaling: 1.0\n")
            f.write("DVH Volume Units: CM3\n")
            f.write("DVH Number of Bins: {}\n".format(len(cumulative_dvh)))
            f.write("DVH Minimum Dose (Gy): {}\n".format(dvh_minimum_dose))
            f.write("DVH Maximum Dose (Gy): {}\n".format(dvh_maximum_dose))
            f.write("DVH Mean Dose (Gy): {}\n".format(dvh_mean_dose))
            f.write("DVH Data (Dose:Volume in cm3):\n")
            
            # 写入DVH数据
            for i in range(n_bins):
                dose_value = dose_values[i] / 100.0  # 转换为Gy
                volume_value = cumulative_dvh[i]
                f.write("{}:{}\n".format(dose_value, volume_value))
            
            f.write("End of ROI\n")  # 用于标识一个ROI的结束

    print("DVH data saved to: {}".format(output_file))


# DICOM导出功能
def dicom_export_with_dvh(csv_file, dcm_folder, plan_name, case_name, target_name):
    patient_db = get_current('PatientDB')
    
    # 解析CSV文件并执行DICOM导出
    with open(csv_file, mode='r') as file:
        csv_reader = csv.DictReader(file)
        
        for row in csv_reader:
            patient_id = row['patient_id']
            export_folder = os.path.join(dcm_folder, patient_id)
            output_dvh_file = os.path.join(export_folder, "All_ROI_DVH_data.txt")
            # 检查目录是否存在,如果不存在则创建它
            if not os.path.exists(export_folder):
                os.makedirs(export_folder)
            # 检查是否已经存在相应的ZIP文件
            zip_filename = '{}.zip'.format(export_folder)
            
            if (not os.path.exists(zip_filename)) and (not os.listdir(export_folder)):
                pat = patient_db.QueryPatientInfo(Filter={'PatientID': patient_id})
                if len(pat) > 0:
                    patient = patient_db.LoadPatient(PatientInfo=pat[0])
                    case = patient.Cases[case_name]
                    case.SetCurrent()
                    
                    iplan = case.QueryPlanInfo(Filter={'Name': plan_name})
                    if len(iplan) > 0:
                        plan = case.LoadPlan(PlanInfo=iplan[0])
                        
                        # 导出DVH数据
                        export_dvh_data(plan, output_dvh_file)
                        
                        # 开始DICOM导出
                        examination = get_current('Examination')
                        beamset = get_current('BeamSet')
                        roi_list = [r.Name for r in case.PatientModel.RegionsOfInterest]
                        if 'PTV_3cm' not in roi_list:
                            retval_0 = case.PatientModel.CreateRoi(Name=r"PTV_3cm", Color="Magenta", Type="Organ", TissueName=None, RbeCellTypeName=None, RoiMaterial=None)
                            retval_0.SetAlgebraExpression(ExpressionA={ 'Operation': "Union", 'SourceRoiNames': [target_name], 'MarginSettings': { 'Type': "Expand", 'Superior': 3, 'Inferior': 3, 'Anterior': 3, 'Posterior': 3, 'Right': 3, 'Left': 3 } }, ExpressionB={ 'Operation': "Union", 'SourceRoiNames': [], 'MarginSettings': { 'Type': "Expand", 'Superior': 0, 'Inferior': 0, 'Anterior': 0, 'Posterior': 0, 'Right': 0, 'Left': 0 } }, ResultOperation="None", ResultMarginSettings={ 'Type': "Expand", 'Superior': 0, 'Inferior': 0, 'Anterior': 0, 'Posterior': 0, 'Right': 0, 'Left': 0 })
                            retval_0.UpdateDerivedGeometry(Examination=examination, Algorithm="Auto")
                            plan.TreatmentCourse.TotalDose.UpdateDoseGridStructures()
                            patient.Save()                        
                        try:
                            result = case.ScriptableDicomExport(ExportFolderPath=export_folder,
                                Examinations=[examination.Name],
                                RtStructureSetsForExaminations=[examination.Name],
                                BeamSets=[beamset.BeamSetIdentifier()],
                                BeamSetDoseForBeamSets=[beamset.BeamSetIdentifier()],
                                BeamDosesForBeamSets=[], #BeamDosesForBeamSets=[beamset.BeamSetIdentifier()],
                                RtStructureSetsReferencedFromBeamSets=[beamset.BeamSetIdentifier()],
                                DicomFilter="",
                                IgnorePreConditionWarnings=True)
                            LogCompleted(result)
                        except SystemError as error:
                            LogWarning(error)
                            print("\nTrying to export again with IgnorePreConditionWarnings=True to", export_folder)
                        except Exception as e:
                            print('Exception %s' % e)
                    else:
                        print("No plan found for patient {}".format(patient_id))
                else:
                    print("Patient {} not found in RayStation".format(patient_id)) 
            else:
                print("{} already exported in the folder {}".format(patient_id, export_folder))
                
            if os.listdir(export_folder) and (not os.path.exists(zip_filename)):
                # 压缩文件夹成ZIP
                with zipfile.ZipFile(zip_filename, 'w', zipfile.ZIP_DEFLATED) as zipf:
                    for root, _, files in os.walk(export_folder):
                        for file in files:
                            file_path = os.path.join(root, file)
                            arcname = os.path.relpath(file_path, export_folder)
                            zipf.write(file_path, arcname)                
                print('Created zip file {}'.format(zip_filename))
            # 删除源文件夹
            if os.path.exists(export_folder) and os.path.exists(zip_filename):
                shutil.rmtree(export_folder)

# 日志功能
def LogWarning(error):
    try:
        jsonWarnings = json.loads(str(error))
        print ("WARNING! Export Aborted!")
        print ("Comment:")
        print (jsonWarnings["Comment"])
        print ("Warnings:")
        for w in jsonWarnings["Warnings"]:
            print (w)
    except ValueError as error:
        print ("Error occurred. Could not export.")

def LogCompleted(result):
    try:
        jsonWarnings = json.loads(str(result))
        print ("Completed!")
        print ("Comment:")
        print (jsonWarnings["Comment"])
        print ("Warnings:")
        for w in jsonWarnings["Warnings"]:
            print (w)
        print ("Export notifications:")
        for w in jsonWarnings["ExportNotifications"]:
            print (w)
    except ValueError as error:
        print ("Error reading completion messages.")

# 配置参数并运行
dcm_folder = r'X:\dcm_temp'
csv_file = r'pat_id.csv'
plan_name = 'infi'
case_name = 'Case 1'
target_name = 'PTV'

if __name__ == '__main__':        
    dicom_export_with_dvh(csv_file, dcm_folder, plan_name, case_name, target_name)


©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容