# Rays_export_plans_dvh.py
# 这个代码是在RS9A里运行的,用来导出pat_id.csv里面患者列表的plan dose和DVH
# pat_id.csv里面的第一行是‘patient_id’, 从第二行每行是id
# 目前的bin_size = 1.0 cGy
import os
import math
import json
import csv
import zipfile
import shutil
from connect import *
# DVH导出功能
def export_dvh_data(plan, output_file):
bin_size = 1 # [cGy]
# 获取计划剂量
if platform.python_implementation() == 'IronPython':
plan_dose = [d for d in plan.TreatmentCourse.TotalDose.DoseValues.DoseData]
else:
plan_dose = plan.TreatmentCourse.TotalDose.DoseValues.DoseData.flatten()
# 计算n_bins,并确保包含0 cGy
n_bins = int(math.ceil(max(plan_dose) / bin_size)) + 1
dose_values = [0] + [bin_size * i for i in range(1, n_bins)]
# 保存所有ROI的DVH数据到同一个txt文件
with open(output_file, 'w') as f:
structure_set = plan.GetStructureSet()
roi_names = [r.OfRoi.Name for r in structure_set.RoiGeometries if r.PrimaryShape is not None]
plan.TreatmentCourse.TotalDose.UpdateDoseGridStructures()
for roi in roi_names:
# 获取指定剂量区间下的累积相对体积
bin_rel_vol = plan.TreatmentCourse.TotalDose.GetRelativeVolumeAtDoseValues(RoiName=roi, DoseValues=dose_values)
# 获取ROI的总体积
roi_volume = plan.TreatmentCourse.TotalDose.GetDoseGridRoi(RoiName=roi).RoiVolumeDistribution.TotalVolume
# 计算累积体积 (cm³)
cumulative_dvh = [volume * roi_volume for volume in bin_rel_vol]
dgr = plan.TreatmentCourse.TotalDose.GetDoseGridRoi(RoiName=roi)
roi_dose = [plan_dose[vi] for vi in dgr.RoiVolumeDistribution.VoxelIndices]
# 计算DVH的最小剂量、最大剂量和平均剂量
dvh_minimum_dose = min(roi_dose) / 100.0 # 转换为 Gy
dvh_maximum_dose = max(roi_dose) / 100.0 # 转换为 Gy
dvh_mean_dose = sum(roi_dose) / len(roi_dose) / 100.0 # 平均剂量,转换为 Gy
# 写入该ROI的DVH头部信息
f.write("ROI Name: {}\n".format(roi))
f.write("DVH Type: CUMULATIVE\n")
f.write("Dose Units: GY\n")
f.write("Dose Type: PHYSICAL\n")
f.write("DVH Dose Scaling: 1.0\n")
f.write("DVH Volume Units: CM3\n")
f.write("DVH Number of Bins: {}\n".format(len(cumulative_dvh)))
f.write("DVH Minimum Dose (Gy): {}\n".format(dvh_minimum_dose))
f.write("DVH Maximum Dose (Gy): {}\n".format(dvh_maximum_dose))
f.write("DVH Mean Dose (Gy): {}\n".format(dvh_mean_dose))
f.write("DVH Data (Dose:Volume in cm3):\n")
# 写入DVH数据
for i in range(n_bins):
dose_value = dose_values[i] / 100.0 # 转换为Gy
volume_value = cumulative_dvh[i]
f.write("{}:{}\n".format(dose_value, volume_value))
f.write("End of ROI\n") # 用于标识一个ROI的结束
print("DVH data saved to: {}".format(output_file))
# DICOM导出功能
def dicom_export_with_dvh(csv_file, dcm_folder, plan_name, case_name, target_name):
patient_db = get_current('PatientDB')
# 解析CSV文件并执行DICOM导出
with open(csv_file, mode='r') as file:
csv_reader = csv.DictReader(file)
for row in csv_reader:
patient_id = row['patient_id']
export_folder = os.path.join(dcm_folder, patient_id)
output_dvh_file = os.path.join(export_folder, "All_ROI_DVH_data.txt")
# 检查目录是否存在,如果不存在则创建它
if not os.path.exists(export_folder):
os.makedirs(export_folder)
# 检查是否已经存在相应的ZIP文件
zip_filename = '{}.zip'.format(export_folder)
if (not os.path.exists(zip_filename)) and (not os.listdir(export_folder)):
pat = patient_db.QueryPatientInfo(Filter={'PatientID': patient_id})
if len(pat) > 0:
patient = patient_db.LoadPatient(PatientInfo=pat[0])
case = patient.Cases[case_name]
case.SetCurrent()
iplan = case.QueryPlanInfo(Filter={'Name': plan_name})
if len(iplan) > 0:
plan = case.LoadPlan(PlanInfo=iplan[0])
# 导出DVH数据
export_dvh_data(plan, output_dvh_file)
# 开始DICOM导出
examination = get_current('Examination')
beamset = get_current('BeamSet')
roi_list = [r.Name for r in case.PatientModel.RegionsOfInterest]
if 'PTV_3cm' not in roi_list:
retval_0 = case.PatientModel.CreateRoi(Name=r"PTV_3cm", Color="Magenta", Type="Organ", TissueName=None, RbeCellTypeName=None, RoiMaterial=None)
retval_0.SetAlgebraExpression(ExpressionA={ 'Operation': "Union", 'SourceRoiNames': [target_name], 'MarginSettings': { 'Type': "Expand", 'Superior': 3, 'Inferior': 3, 'Anterior': 3, 'Posterior': 3, 'Right': 3, 'Left': 3 } }, ExpressionB={ 'Operation': "Union", 'SourceRoiNames': [], 'MarginSettings': { 'Type': "Expand", 'Superior': 0, 'Inferior': 0, 'Anterior': 0, 'Posterior': 0, 'Right': 0, 'Left': 0 } }, ResultOperation="None", ResultMarginSettings={ 'Type': "Expand", 'Superior': 0, 'Inferior': 0, 'Anterior': 0, 'Posterior': 0, 'Right': 0, 'Left': 0 })
retval_0.UpdateDerivedGeometry(Examination=examination, Algorithm="Auto")
plan.TreatmentCourse.TotalDose.UpdateDoseGridStructures()
patient.Save()
try:
result = case.ScriptableDicomExport(ExportFolderPath=export_folder,
Examinations=[examination.Name],
RtStructureSetsForExaminations=[examination.Name],
BeamSets=[beamset.BeamSetIdentifier()],
BeamSetDoseForBeamSets=[beamset.BeamSetIdentifier()],
BeamDosesForBeamSets=[], #BeamDosesForBeamSets=[beamset.BeamSetIdentifier()],
RtStructureSetsReferencedFromBeamSets=[beamset.BeamSetIdentifier()],
DicomFilter="",
IgnorePreConditionWarnings=True)
LogCompleted(result)
except SystemError as error:
LogWarning(error)
print("\nTrying to export again with IgnorePreConditionWarnings=True to", export_folder)
except Exception as e:
print('Exception %s' % e)
else:
print("No plan found for patient {}".format(patient_id))
else:
print("Patient {} not found in RayStation".format(patient_id))
else:
print("{} already exported in the folder {}".format(patient_id, export_folder))
if os.listdir(export_folder) and (not os.path.exists(zip_filename)):
# 压缩文件夹成ZIP
with zipfile.ZipFile(zip_filename, 'w', zipfile.ZIP_DEFLATED) as zipf:
for root, _, files in os.walk(export_folder):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.relpath(file_path, export_folder)
zipf.write(file_path, arcname)
print('Created zip file {}'.format(zip_filename))
# 删除源文件夹
if os.path.exists(export_folder) and os.path.exists(zip_filename):
shutil.rmtree(export_folder)
# 日志功能
def LogWarning(error):
try:
jsonWarnings = json.loads(str(error))
print ("WARNING! Export Aborted!")
print ("Comment:")
print (jsonWarnings["Comment"])
print ("Warnings:")
for w in jsonWarnings["Warnings"]:
print (w)
except ValueError as error:
print ("Error occurred. Could not export.")
def LogCompleted(result):
try:
jsonWarnings = json.loads(str(result))
print ("Completed!")
print ("Comment:")
print (jsonWarnings["Comment"])
print ("Warnings:")
for w in jsonWarnings["Warnings"]:
print (w)
print ("Export notifications:")
for w in jsonWarnings["ExportNotifications"]:
print (w)
except ValueError as error:
print ("Error reading completion messages.")
# 配置参数并运行
dcm_folder = r'X:\dcm_temp'
csv_file = r'pat_id.csv'
plan_name = 'infi'
case_name = 'Case 1'
target_name = 'PTV'
if __name__ == '__main__':
dicom_export_with_dvh(csv_file, dcm_folder, plan_name, case_name, target_name)
2024-08-13 给RD.dcm添加DVH Sequence (2)
©著作权归作者所有,转载或内容合作请联系作者
- 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
- 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
- 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
推荐阅读更多精彩内容
- 在古老的乡村,流传着一句谚语:“不想吃牛肉,不会去郑谷(牛市场)。”这句简单的话语,道出了市场交易的基本逻辑——供...