import random import time from abc import ABC, abstractmethod import yaml from tqdm import tqdm import core from core import distance from entity.Line import Line from entity.Node import Node from entity.primitives import Tun2d, Window, Gate, Sealed, WindBridge, TunText, AirFlow, Shaft, Fan, AirDuct standard_width = 5 class DictManager(ABC): def __init__(self, original_data): self.original_data = original_data self.layer_map = self.get_layer_map() self.tun_dict = {} self.air_flow_list = [] self.window_list = [] self.gate_list = [] self.fan_list = [] self.wind_bridge_list = [] self.air_duct_list = [] self.sealed_list = [] self.tun_text_list = [] self.air_duct_list = [] def get_layer_map(self): layer_list = self.original_data["layerMap"] layer_map = {} div_index = 0 for layer in layer_list: layer[1]['divIndex'] = div_index layer_map[layer[0]] = layer[1] div_index = div_index + 1 return layer_map class DictManger2D(DictManager): def __init__(self, original_data): super().__init__(original_data) self.shaft_list = [] self.get_tuns() self.get_tun_text() self.get_windows() self.get_gates() self.get_sealed() self.get_wind_bridges() self.get_air_flow() self.get_shaft() self.get_fans() self.get_duct_list() def get_tuns(self): node_list = self.original_data["tunsMap"] for node in tqdm(node_list, desc=' 【巷道读取中】'): node = node[1] if node["tunType"] == '1': continue divIndex = self.layer_map[node['nlayerid']]['divIndex'] color = self.layer_map[node['nlayerid']]['ncolor'] color = core.hex_to_rgb(color) from_ = (node["nfrom"]["x"], node["nfrom"]["z"]) to_ = (node["nto"]["x"], node["nto"]["z"]) vec_middle = from_, to_ center = core.find_point_on_line(from_, to_, 1 / 2) # center_node = Node(center, '2d', divIndex) middle_line = Line(vec_middle, '2d', divIndex) vec12_ = (node["vec12"][0]['x'], node["vec12"][0]['z']), (node["vec12"][1]['x'], node["vec12"][1]['z']) vec34_ = (node["vec34"][0]['x'], node["vec34"][0]['z']), (node["vec34"][1]['x'], node["vec34"][1]['z']) width = core.min_distance_between_segments(vec12_, vec34_) vec_list = [] if 'headVecList' in node and node['headVecList'] is not None: vec_head_ = (node['headVecList'][0]["x"], node['headVecList'][0]["z"]), ( node['headVecList'][1]["x"], node['headVecList'][1]["z"]) vec_head_line = Line(vec_head_, '2d', divIndex) else: NoneLine = (0, 0), (0, 0) vec_head_line = Line(NoneLine, '2d', divIndex) vec_list.append(vec_head_line) for i in range(0, len(node["vec12"]), 2): vec12_ = (node["vec12"][i]['x'], node["vec12"][i]['z']), ( node["vec12"][i + 1]['x'], node["vec12"][i + 1]['z']) vec12_line = Line(vec12_, '2d', divIndex) vec_list.append(vec12_line) for i in range(0, len(node["vec34"]), 2): vec34_ = (node["vec34"][i]['x'], node["vec34"][i]['z']), ( node["vec34"][i + 1]['x'], node["vec34"][i + 1]['z']) vec34_line = Line(vec34_, '2d', divIndex) vec_list.append(vec34_line) tun = Tun2d(node["ntunid"], node["nlayerid"], node["strname"], center_node, middle_line, vec_list, node['fq'], color, width, node["arrowShow"], node['nairtype']) self.tun_dict[tun.id] = tun def _process_vent_graph(self, layer_map, item_type, desc, callback): """ 通用方法:处理 layer_map 中的 items(windows 或 gates) :param layer_map: 图层映射数据 :param item_type: 项目类型('windows' 或 'gates') :param desc: tqdm 进度条描述 :param callback: 处理每个项目的回调函数 """ for layer in tqdm(layer_map, desc=desc): items = layer[1][item_type] for item in items: tun = self.tun_dict.get(item["ntunid"]) if tun is None: continue divIndex = self.layer_map[item['nlayerid']]['divIndex'] center = Node((item['x'], item['z']), '2d', divIndex) color = core.hex_to_rgb(self.layer_map[item['nlayerid']]['ncolor']) # 调用回调函数处理具体逻辑 callback(item, tun, center, color) def get_windows(self): """ 获取风窗数据 """ def process_window(window, tun, center, color): window_obj = Window( window['id'], window["nlayerid"], window["strname"], center, tun.middle_line, tun.width, color ) self.window_list.append(window_obj) self._process_vent_graph(self.original_data["layerMap"], 'windows', '【风窗读取中】', process_window) def get_gates(self): """ 获取门数据 """ def process_gate(gate, tun, center, color): gate_obj = Gate( gate['id'], gate["nlayerid"], gate["strname"], center, tun.middle_line, tun.width, color ) self.gate_list.append(gate_obj) self._process_vent_graph(self.original_data["layerMap"], 'gates', '【风门读取中】', process_gate) def get_sealed(self): def process_sealed(sealed, tun, center, color): sealed_obj = Sealed( sealed['id'], sealed["nlayerid"], sealed["strname"], center, tun.middle_line, tun.width, color, tun.id ) self.sealed_list.append(sealed_obj) self._process_vent_graph(self.original_data["layerMap"], 'obfurages', '【密闭读取中】', process_sealed) def get_wind_bridges(self): for layer in tqdm(self.original_data['layerMap'], '【风桥读取中】'): layer = layer[1] wb_list = layer['crossNodes'] for wb in wb_list: divIndex = self.layer_map[wb['layerId']]['divIndex'] center = Node((wb['crossPoint']['x'], -wb['crossPoint']['y']), '2d', divIndex) tun = self.tun_dict.get(wb['tun1Id']) color = self.layer_map[wb['layerId']]['ncolor'] color = core.hex_to_rgb(color) wb_obj = WindBridge( wb['tun1Id'], wb['layerId'], '', center, tun.middle_line, tun.width, color ) self.wind_bridge_list.append(wb_obj) def get_tun_text(self): node_list = self.original_data["tunsMap"] tun_text_dict = {} for node in tqdm(node_list, desc=' 【巷道文字读取中】'): node = node[1] if node["tunType"] == '1': continue tun = self.tun_dict.get(node["ntunid"]) if node['nusingtype'] == "1": fq = node['freportq'] * 60 else: fq = node['fq'] * 60 formatted_fq = round(fq, 1) tun.fq = formatted_fq fq_text = str(formatted_fq) + "m³/min" fq_v = node['fq'] tun_key = node["strname"] + " " + fq_text + str( core.calculate_angle_with_x_axis(tun.middle_line.original_line[0], tun.middle_line.original_line[1])) if tun_key not in tun_text_dict: # 如果不在,则初始化一个新的空列表作为值 tun_text_dict[tun_key] = { "id": tun.id, "text": node["strname"] + " " + fq_text, "name": tun.name, "layer_id": tun.layer_id, "width": standard_width, "color": tun.color, "value": formatted_fq, "tun_middle_lines": [] } from_ = (node["nfrom"]["x"], node["nfrom"]["z"]) to_ = (node["nto"]["x"], node["nto"]["z"]) # 将 from_ 和 to_ 坐标添加到 tun_text 对应的列表中 tun_text_dict[tun_key]['tun_middle_lines'].extend([from_, to_]) for tun_text in tqdm(tun_text_dict.values(), desc=' 【巷道文字读取中】'): tun_middle_lines = tun_text['tun_middle_lines'] farthest = core.farthest_points(tun_middle_lines) divIndex = self.layer_map[tun_text['layer_id']]['divIndex'] middle_line = Line(farthest, '2d', divIndex) center = core.find_point_on_line(farthest[0], farthest[1], 1 / 2) # center_node = Node(center, '2d', divIndex) tt = TunText(tun_text['id'], tun_text['layer_id'], tun_text['text'], center_node, middle_line, tun_text['width'], tun_text['color'], tun_text['value']) tun_length = core.distance(middle_line.agg_line[0], middle_line.agg_line[1]) text_len = len(tun_text['text']) * float(tun_text['width']) if tun_length * 0.8 > text_len: self.tun_text_list.append(tt) def get_air_flow(self): sealed_tun_ids = [obj.from_tun_id for obj in self.sealed_list] for tun in tqdm(self.tun_dict.values(), '【风流读取中】'): if tun.arrow_show == 0: continue; if tun.id in sealed_tun_ids: continue # 密闭空间不画风流 if tun.fq == 0: continue tun_len = core.distance(tun.middle_line.original_line[0], tun.middle_line.original_line[1]) if tun_len < 7 * tun.width: continue divIndex = self.layer_map[tun.layer_id]['divIndex'] if tun_len < 50 * tun.width: center = tun.center air_flow = AirFlow(core.get_random_id(), tun.layer_id, '', center, tun.middle_line, tun.width, tun.color, tun.air_type) self.air_flow_list.append(air_flow) else: space = int(tun_len // (50 * tun.width)) points = core.divide_segment(tun.middle_line.original_line[0], tun.middle_line.original_line[1], space)[ 1:-1] for i in range(len(points)): center = Node(points[i], '2d', divIndex) air_flow = AirFlow(core.get_random_id(), tun.layer_id, '', center, tun.middle_line, standard_width, tun.color, tun.air_type) self.air_flow_list.append(air_flow) def get_shaft(self): node_list = self.original_data["tunsMap"] for node in tqdm(node_list, desc=' 【立井读取中】'): node = node[1] if node["tunType"] == '1': divIndex = self.layer_map[node["nlayerid"]]['divIndex'] center = Node((node["nfrom"]["x"], node["nfrom"]["z"]), '2d', divIndex) color = self.layer_map[node['nlayerid']]['ncolor'] color = core.hex_to_rgb(color) none_line = Line(((0, 0), (0, 0)), '2d', 0) shaft = Shaft(node["ntunid"], node["nlayerid"], node["strname"], center, none_line, node["fwidth"], color) self.shaft_list.append(shaft) def get_fans(self): def process(fan, tun, center, color): fan_obj = Fan( fan['id'], fan["nlayerid"], fan["strname"], center, tun.middle_line, tun.width, color, fan['strtype'] ) self.fan_list.append(fan_obj) self._process_vent_graph(self.original_data["layerMap"], 'fans', '【风扇扇读取中】', process) def get_duct_list(self): layer_map = self.original_data["layerMap"] for layer in tqdm(layer_map, desc=' 【风筒读取中】'): fans = layer[1]["fans"] for f in fans: if 'fanlocal' not in f['strtype']: continue tun = self.tun_dict.get(f["ntunid"]) if tun is None: continue path_list = [] divIndex = self.layer_map[f['nlayerid']]['divIndex'] for path in f['localFanCylinderPath']: from_to = (path[0]['x'], path[0]['z']), (path[1]['x'], path[1]['z']) path_line = Line(from_to, '2d', divIndex) path_list.append(path_line) no_center = Node((0, 0), '2d', divIndex) no_line = Line(((0, 0), (0, 0)), '2d', divIndex) air_duct = AirDuct(f['id'], f['nlayerid'], f['strname'], no_center, no_line, tun.width, tun.color, path_list) self.air_duct_list.append(air_duct)