123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345 |
- import random
- import time
- from abc import ABC, abstractmethod
- import yaml
- from tqdm import tqdm
- import core
- from core import distance
- from entity.Line import Line
- from entity.Node import Node
- from entity.primitives import Tun2d, Window, Gate, Sealed, WindBridge, TunText, AirFlow, Shaft, Fan, AirDuct
- standard_width = 5
- class DictManager(ABC):
- def __init__(self, original_data):
- self.original_data = original_data
- self.layer_map = self.get_layer_map()
- self.tun_dict = {}
- self.air_flow_list = []
- self.window_list = []
- self.gate_list = []
- self.fan_list = []
- self.wind_bridge_list = []
- self.air_duct_list = []
- self.sealed_list = []
- self.tun_text_list = []
- self.air_duct_list = []
- def get_layer_map(self):
- layer_list = self.original_data["layerMap"]
- layer_map = {}
- div_index = 0
- for layer in layer_list:
- layer[1]['divIndex'] = div_index
- layer_map[layer[0]] = layer[1]
- div_index = div_index + 1
- return layer_map
- class DictManger2D(DictManager):
- def __init__(self, original_data):
- super().__init__(original_data)
- self.shaft_list = []
- self.get_tuns()
- self.get_tun_text()
- self.get_windows()
- self.get_gates()
- self.get_sealed()
- self.get_wind_bridges()
- self.get_air_flow()
- self.get_shaft()
- self.get_fans()
- self.get_duct_list()
- def get_tuns(self):
- node_list = self.original_data["tunsMap"]
- for node in tqdm(node_list, desc=' 【巷道读取中】'):
- node = node[1]
- if node["tunType"] == '1': continue
- divIndex = self.layer_map[node['nlayerid']]['divIndex']
- color = self.layer_map[node['nlayerid']]['ncolor']
- color = core.hex_to_rgb(color)
- from_ = (node["nfrom"]["x"], node["nfrom"]["z"])
- to_ = (node["nto"]["x"], node["nto"]["z"])
- vec_middle = from_, to_
- center = core.find_point_on_line(from_, to_, 1 / 2)
- #
- center_node = Node(center, '2d', divIndex)
- middle_line = Line(vec_middle, '2d', divIndex)
- vec12_ = (node["vec12"][0]['x'], node["vec12"][0]['z']), (node["vec12"][1]['x'], node["vec12"][1]['z'])
- vec34_ = (node["vec34"][0]['x'], node["vec34"][0]['z']), (node["vec34"][1]['x'], node["vec34"][1]['z'])
- width = core.min_distance_between_segments(vec12_, vec34_)
- vec_list = []
- if 'headVecList' in node and node['headVecList'] is not None:
- vec_head_ = (node['headVecList'][0]["x"], node['headVecList'][0]["z"]), (
- node['headVecList'][1]["x"], node['headVecList'][1]["z"])
- vec_head_line = Line(vec_head_, '2d', divIndex)
- else:
- NoneLine = (0, 0), (0, 0)
- vec_head_line = Line(NoneLine, '2d', divIndex)
- vec_list.append(vec_head_line)
- for i in range(0, len(node["vec12"]), 2):
- vec12_ = (node["vec12"][i]['x'], node["vec12"][i]['z']), (
- node["vec12"][i + 1]['x'], node["vec12"][i + 1]['z'])
- vec12_line = Line(vec12_, '2d', divIndex)
- vec_list.append(vec12_line)
- for i in range(0, len(node["vec34"]), 2):
- vec34_ = (node["vec34"][i]['x'], node["vec34"][i]['z']), (
- node["vec34"][i + 1]['x'], node["vec34"][i + 1]['z'])
- vec34_line = Line(vec34_, '2d', divIndex)
- vec_list.append(vec34_line)
- tun = Tun2d(node["ntunid"], node["nlayerid"], node["strname"], center_node, middle_line, vec_list,
- node['fq'], color, width, node["arrowShow"], node['nairtype'])
- self.tun_dict[tun.id] = tun
- def _process_vent_graph(self, layer_map, item_type, desc, callback):
- """
- 通用方法:处理 layer_map 中的 items(windows 或 gates)
- :param layer_map: 图层映射数据
- :param item_type: 项目类型('windows' 或 'gates')
- :param desc: tqdm 进度条描述
- :param callback: 处理每个项目的回调函数
- """
- for layer in tqdm(layer_map, desc=desc):
- items = layer[1][item_type]
- for item in items:
- tun = self.tun_dict.get(item["ntunid"])
- if tun is None:
- continue
- divIndex = self.layer_map[item['nlayerid']]['divIndex']
- center = Node((item['x'], item['z']), '2d', divIndex)
- color = core.hex_to_rgb(self.layer_map[item['nlayerid']]['ncolor'])
- # 调用回调函数处理具体逻辑
- callback(item, tun, center, color)
- def get_windows(self):
- """
- 获取风窗数据
- """
- def process_window(window, tun, center, color):
- window_obj = Window(
- window['id'], window["nlayerid"], window["strname"],
- center, tun.middle_line, tun.width, color
- )
- self.window_list.append(window_obj)
- self._process_vent_graph(self.original_data["layerMap"], 'windows', '【风窗读取中】', process_window)
- def get_gates(self):
- """
- 获取门数据
- """
- def process_gate(gate, tun, center, color):
- gate_obj = Gate(
- gate['id'], gate["nlayerid"], gate["strname"],
- center, tun.middle_line, tun.width, color
- )
- self.gate_list.append(gate_obj)
- self._process_vent_graph(self.original_data["layerMap"], 'gates', '【风门读取中】', process_gate)
- def get_sealed(self):
- def process_sealed(sealed, tun, center, color):
- sealed_obj = Sealed(
- sealed['id'], sealed["nlayerid"], sealed["strname"],
- center, tun.middle_line, tun.width, color, tun.id
- )
- self.sealed_list.append(sealed_obj)
- self._process_vent_graph(self.original_data["layerMap"], 'obfurages', '【密闭读取中】', process_sealed)
- def get_wind_bridges(self):
- for layer in tqdm(self.original_data['layerMap'], '【风桥读取中】'):
- layer = layer[1]
- wb_list = layer['crossNodes']
- for wb in wb_list:
- divIndex = self.layer_map[wb['layerId']]['divIndex']
- center = Node((wb['crossPoint']['x'], -wb['crossPoint']['y']), '2d', divIndex)
- tun = self.tun_dict.get(wb['tun1Id'])
- color = self.layer_map[wb['layerId']]['ncolor']
- color = core.hex_to_rgb(color)
- wb_obj = WindBridge(
- wb['tun1Id'], wb['layerId'], '', center, tun.middle_line, tun.width, color
- )
- self.wind_bridge_list.append(wb_obj)
- def get_tun_text(self):
- node_list = self.original_data["tunsMap"]
- tun_text_dict = {}
- for node in tqdm(node_list, desc=' 【巷道文字读取中】'):
- node = node[1]
- if node["tunType"] == '1': continue
- tun = self.tun_dict.get(node["ntunid"])
- if node['nusingtype'] == "1":
- fq = node['freportq'] * 60
- else:
- fq = node['fq'] * 60
- formatted_fq = round(fq, 1)
- tun.fq = formatted_fq
- fq_text = str(formatted_fq) + "m³/min"
- fq_v = node['fq']
- tun_key = node["strname"] + " " + fq_text + str(
- core.calculate_angle_with_x_axis(tun.middle_line.original_line[0], tun.middle_line.original_line[1]))
- if tun_key not in tun_text_dict:
- # 如果不在,则初始化一个新的空列表作为值
- tun_text_dict[tun_key] = {
- "id": tun.id,
- "text": node["strname"] + " " + fq_text,
- "name": tun.name,
- "layer_id": tun.layer_id,
- "width": standard_width,
- "color": tun.color,
- "value": formatted_fq,
- "tun_middle_lines": []
- }
- from_ = (node["nfrom"]["x"], node["nfrom"]["z"])
- to_ = (node["nto"]["x"], node["nto"]["z"])
- # 将 from_ 和 to_ 坐标添加到 tun_text 对应的列表中
- tun_text_dict[tun_key]['tun_middle_lines'].extend([from_, to_])
- for tun_text in tqdm(tun_text_dict.values(), desc=' 【巷道文字读取中】'):
- tun_middle_lines = tun_text['tun_middle_lines']
- farthest = core.farthest_points(tun_middle_lines)
- divIndex = self.layer_map[tun_text['layer_id']]['divIndex']
- middle_line = Line(farthest, '2d', divIndex)
- center = core.find_point_on_line(farthest[0], farthest[1], 1 / 2)
- #
- center_node = Node(center, '2d', divIndex)
- tt = TunText(tun_text['id'], tun_text['layer_id'], tun_text['text'], center_node, middle_line,
- tun_text['width'], tun_text['color'], tun_text['value'])
- tun_length = core.distance(middle_line.agg_line[0], middle_line.agg_line[1])
- text_len = len(tun_text['text']) * float(tun_text['width'])
- if tun_length * 0.8 > text_len:
- self.tun_text_list.append(tt)
- def get_air_flow(self):
- sealed_tun_ids = [obj.from_tun_id for obj in self.sealed_list]
- for tun in tqdm(self.tun_dict.values(), '【风流读取中】'):
- if tun.arrow_show == 0: continue;
- if tun.id in sealed_tun_ids: continue # 密闭空间不画风流
- if tun.fq == 0: continue
- tun_len = core.distance(tun.middle_line.original_line[0], tun.middle_line.original_line[1])
- if tun_len < 7 * tun.width: continue
- divIndex = self.layer_map[tun.layer_id]['divIndex']
- if tun_len < 50 * tun.width:
- center = tun.center
- air_flow = AirFlow(core.get_random_id(), tun.layer_id, '', center, tun.middle_line, tun.width,
- tun.color, tun.air_type)
- self.air_flow_list.append(air_flow)
- else:
- space = int(tun_len // (50 * tun.width))
- points = core.divide_segment(tun.middle_line.original_line[0], tun.middle_line.original_line[1], space)[
- 1:-1]
- for i in range(len(points)):
- center = Node(points[i], '2d', divIndex)
- air_flow = AirFlow(core.get_random_id(), tun.layer_id, '', center, tun.middle_line, standard_width,
- tun.color,
- tun.air_type)
- self.air_flow_list.append(air_flow)
- def get_shaft(self):
- node_list = self.original_data["tunsMap"]
- for node in tqdm(node_list, desc=' 【立井读取中】'):
- node = node[1]
- if node["tunType"] == '1':
- divIndex = self.layer_map[node["nlayerid"]]['divIndex']
- center = Node((node["nfrom"]["x"], node["nfrom"]["z"]), '2d', divIndex)
- color = self.layer_map[node['nlayerid']]['ncolor']
- color = core.hex_to_rgb(color)
- none_line = Line(((0, 0), (0, 0)), '2d', 0)
- shaft = Shaft(node["ntunid"], node["nlayerid"], node["strname"], center, none_line, node["fwidth"],
- color)
- self.shaft_list.append(shaft)
- def get_fans(self):
- def process(fan, tun, center, color):
- fan_obj = Fan(
- fan['id'], fan["nlayerid"], fan["strname"],
- center, tun.middle_line, tun.width, color, fan['strtype']
- )
- self.fan_list.append(fan_obj)
- self._process_vent_graph(self.original_data["layerMap"], 'fans', '【风扇扇读取中】', process)
- def get_duct_list(self):
- layer_map = self.original_data["layerMap"]
- for layer in tqdm(layer_map, desc=' 【风筒读取中】'):
- fans = layer[1]["fans"]
- for f in fans:
- if 'fanlocal' not in f['strtype']: continue
- tun = self.tun_dict.get(f["ntunid"])
- if tun is None: continue
- path_list = []
- divIndex = self.layer_map[f['nlayerid']]['divIndex']
- for path in f['localFanCylinderPath']:
- from_to = (path[0]['x'], path[0]['z']), (path[1]['x'], path[1]['z'])
- path_line = Line(from_to, '2d', divIndex)
- path_list.append(path_line)
- no_center = Node((0, 0), '2d', divIndex)
- no_line = Line(((0, 0), (0, 0)), '2d', divIndex)
- air_duct = AirDuct(f['id'], f['nlayerid'], f['strname'], no_center, no_line, tun.width, tun.color,
- path_list)
- self.air_duct_list.append(air_duct)
|