dict_manager.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345
  1. import random
  2. import time
  3. from abc import ABC, abstractmethod
  4. import yaml
  5. from tqdm import tqdm
  6. import core
  7. from core import distance
  8. from entity.Line import Line
  9. from entity.Node import Node
  10. from entity.primitives import Tun2d, Window, Gate, Sealed, WindBridge, TunText, AirFlow, Shaft, Fan, AirDuct
  11. standard_width = 5
  12. class DictManager(ABC):
  13. def __init__(self, original_data):
  14. self.original_data = original_data
  15. self.layer_map = self.get_layer_map()
  16. self.tun_dict = {}
  17. self.air_flow_list = []
  18. self.window_list = []
  19. self.gate_list = []
  20. self.fan_list = []
  21. self.wind_bridge_list = []
  22. self.air_duct_list = []
  23. self.sealed_list = []
  24. self.tun_text_list = []
  25. self.air_duct_list = []
  26. def get_layer_map(self):
  27. layer_list = self.original_data["layerMap"]
  28. layer_map = {}
  29. div_index = 0
  30. for layer in layer_list:
  31. layer[1]['divIndex'] = div_index
  32. layer_map[layer[0]] = layer[1]
  33. div_index = div_index + 1
  34. return layer_map
  35. class DictManger2D(DictManager):
  36. def __init__(self, original_data):
  37. super().__init__(original_data)
  38. self.shaft_list = []
  39. self.get_tuns()
  40. self.get_tun_text()
  41. self.get_windows()
  42. self.get_gates()
  43. self.get_sealed()
  44. self.get_wind_bridges()
  45. self.get_air_flow()
  46. self.get_shaft()
  47. self.get_fans()
  48. self.get_duct_list()
  49. def get_tuns(self):
  50. node_list = self.original_data["tunsMap"]
  51. for node in tqdm(node_list, desc=' 【巷道读取中】'):
  52. node = node[1]
  53. if node["tunType"] == '1': continue
  54. divIndex = self.layer_map[node['nlayerid']]['divIndex']
  55. color = self.layer_map[node['nlayerid']]['ncolor']
  56. color = core.hex_to_rgb(color)
  57. from_ = (node["nfrom"]["x"], node["nfrom"]["z"])
  58. to_ = (node["nto"]["x"], node["nto"]["z"])
  59. vec_middle = from_, to_
  60. center = core.find_point_on_line(from_, to_, 1 / 2)
  61. #
  62. center_node = Node(center, '2d', divIndex)
  63. middle_line = Line(vec_middle, '2d', divIndex)
  64. vec12_ = (node["vec12"][0]['x'], node["vec12"][0]['z']), (node["vec12"][1]['x'], node["vec12"][1]['z'])
  65. vec34_ = (node["vec34"][0]['x'], node["vec34"][0]['z']), (node["vec34"][1]['x'], node["vec34"][1]['z'])
  66. width = core.min_distance_between_segments(vec12_, vec34_)
  67. vec_list = []
  68. if 'headVecList' in node and node['headVecList'] is not None:
  69. vec_head_ = (node['headVecList'][0]["x"], node['headVecList'][0]["z"]), (
  70. node['headVecList'][1]["x"], node['headVecList'][1]["z"])
  71. vec_head_line = Line(vec_head_, '2d', divIndex)
  72. else:
  73. NoneLine = (0, 0), (0, 0)
  74. vec_head_line = Line(NoneLine, '2d', divIndex)
  75. vec_list.append(vec_head_line)
  76. for i in range(0, len(node["vec12"]), 2):
  77. vec12_ = (node["vec12"][i]['x'], node["vec12"][i]['z']), (
  78. node["vec12"][i + 1]['x'], node["vec12"][i + 1]['z'])
  79. vec12_line = Line(vec12_, '2d', divIndex)
  80. vec_list.append(vec12_line)
  81. for i in range(0, len(node["vec34"]), 2):
  82. vec34_ = (node["vec34"][i]['x'], node["vec34"][i]['z']), (
  83. node["vec34"][i + 1]['x'], node["vec34"][i + 1]['z'])
  84. vec34_line = Line(vec34_, '2d', divIndex)
  85. vec_list.append(vec34_line)
  86. tun = Tun2d(node["ntunid"], node["nlayerid"], node["strname"], center_node, middle_line, vec_list,
  87. node['fq'], color, width, node["arrowShow"], node['nairtype'])
  88. self.tun_dict[tun.id] = tun
  89. def _process_vent_graph(self, layer_map, item_type, desc, callback):
  90. """
  91. 通用方法:处理 layer_map 中的 items(windows 或 gates)
  92. :param layer_map: 图层映射数据
  93. :param item_type: 项目类型('windows' 或 'gates')
  94. :param desc: tqdm 进度条描述
  95. :param callback: 处理每个项目的回调函数
  96. """
  97. for layer in tqdm(layer_map, desc=desc):
  98. items = layer[1][item_type]
  99. for item in items:
  100. tun = self.tun_dict.get(item["ntunid"])
  101. if tun is None:
  102. continue
  103. divIndex = self.layer_map[item['nlayerid']]['divIndex']
  104. center = Node((item['x'], item['z']), '2d', divIndex)
  105. color = core.hex_to_rgb(self.layer_map[item['nlayerid']]['ncolor'])
  106. # 调用回调函数处理具体逻辑
  107. callback(item, tun, center, color)
  108. def get_windows(self):
  109. """
  110. 获取风窗数据
  111. """
  112. def process_window(window, tun, center, color):
  113. window_obj = Window(
  114. window['id'], window["nlayerid"], window["strname"],
  115. center, tun.middle_line, tun.width, color
  116. )
  117. self.window_list.append(window_obj)
  118. self._process_vent_graph(self.original_data["layerMap"], 'windows', '【风窗读取中】', process_window)
  119. def get_gates(self):
  120. """
  121. 获取门数据
  122. """
  123. def process_gate(gate, tun, center, color):
  124. gate_obj = Gate(
  125. gate['id'], gate["nlayerid"], gate["strname"],
  126. center, tun.middle_line, tun.width, color
  127. )
  128. self.gate_list.append(gate_obj)
  129. self._process_vent_graph(self.original_data["layerMap"], 'gates', '【风门读取中】', process_gate)
  130. def get_sealed(self):
  131. def process_sealed(sealed, tun, center, color):
  132. sealed_obj = Sealed(
  133. sealed['id'], sealed["nlayerid"], sealed["strname"],
  134. center, tun.middle_line, tun.width, color, tun.id
  135. )
  136. self.sealed_list.append(sealed_obj)
  137. self._process_vent_graph(self.original_data["layerMap"], 'obfurages', '【密闭读取中】', process_sealed)
  138. def get_wind_bridges(self):
  139. for layer in tqdm(self.original_data['layerMap'], '【风桥读取中】'):
  140. layer = layer[1]
  141. wb_list = layer['crossNodes']
  142. for wb in wb_list:
  143. divIndex = self.layer_map[wb['layerId']]['divIndex']
  144. center = Node((wb['crossPoint']['x'], -wb['crossPoint']['y']), '2d', divIndex)
  145. tun = self.tun_dict.get(wb['tun1Id'])
  146. color = self.layer_map[wb['layerId']]['ncolor']
  147. color = core.hex_to_rgb(color)
  148. wb_obj = WindBridge(
  149. wb['tun1Id'], wb['layerId'], '', center, tun.middle_line, tun.width, color
  150. )
  151. self.wind_bridge_list.append(wb_obj)
  152. def get_tun_text(self):
  153. node_list = self.original_data["tunsMap"]
  154. tun_text_dict = {}
  155. for node in tqdm(node_list, desc=' 【巷道文字读取中】'):
  156. node = node[1]
  157. if node["tunType"] == '1': continue
  158. tun = self.tun_dict.get(node["ntunid"])
  159. if node['nusingtype'] == "1":
  160. fq = node['freportq'] * 60
  161. else:
  162. fq = node['fq'] * 60
  163. formatted_fq = round(fq, 1)
  164. tun.fq = formatted_fq
  165. fq_text = str(formatted_fq) + "m³/min"
  166. fq_v = node['fq']
  167. tun_key = node["strname"] + " " + fq_text + str(
  168. core.calculate_angle_with_x_axis(tun.middle_line.original_line[0], tun.middle_line.original_line[1]))
  169. if tun_key not in tun_text_dict:
  170. # 如果不在,则初始化一个新的空列表作为值
  171. tun_text_dict[tun_key] = {
  172. "id": tun.id,
  173. "text": node["strname"] + " " + fq_text,
  174. "name": tun.name,
  175. "layer_id": tun.layer_id,
  176. "width": standard_width,
  177. "color": tun.color,
  178. "value": formatted_fq,
  179. "tun_middle_lines": []
  180. }
  181. from_ = (node["nfrom"]["x"], node["nfrom"]["z"])
  182. to_ = (node["nto"]["x"], node["nto"]["z"])
  183. # 将 from_ 和 to_ 坐标添加到 tun_text 对应的列表中
  184. tun_text_dict[tun_key]['tun_middle_lines'].extend([from_, to_])
  185. for tun_text in tqdm(tun_text_dict.values(), desc=' 【巷道文字读取中】'):
  186. tun_middle_lines = tun_text['tun_middle_lines']
  187. farthest = core.farthest_points(tun_middle_lines)
  188. divIndex = self.layer_map[tun_text['layer_id']]['divIndex']
  189. middle_line = Line(farthest, '2d', divIndex)
  190. center = core.find_point_on_line(farthest[0], farthest[1], 1 / 2)
  191. #
  192. center_node = Node(center, '2d', divIndex)
  193. tt = TunText(tun_text['id'], tun_text['layer_id'], tun_text['text'], center_node, middle_line,
  194. tun_text['width'], tun_text['color'], tun_text['value'])
  195. tun_length = core.distance(middle_line.agg_line[0], middle_line.agg_line[1])
  196. text_len = len(tun_text['text']) * float(tun_text['width'])
  197. if tun_length * 0.8 > text_len:
  198. self.tun_text_list.append(tt)
  199. def get_air_flow(self):
  200. sealed_tun_ids = [obj.from_tun_id for obj in self.sealed_list]
  201. for tun in tqdm(self.tun_dict.values(), '【风流读取中】'):
  202. if tun.arrow_show == 0: continue;
  203. if tun.id in sealed_tun_ids: continue # 密闭空间不画风流
  204. if tun.fq == 0: continue
  205. tun_len = core.distance(tun.middle_line.original_line[0], tun.middle_line.original_line[1])
  206. if tun_len < 7 * tun.width: continue
  207. divIndex = self.layer_map[tun.layer_id]['divIndex']
  208. if tun_len < 50 * tun.width:
  209. center = tun.center
  210. air_flow = AirFlow(core.get_random_id(), tun.layer_id, '', center, tun.middle_line, tun.width,
  211. tun.color, tun.air_type)
  212. self.air_flow_list.append(air_flow)
  213. else:
  214. space = int(tun_len // (50 * tun.width))
  215. points = core.divide_segment(tun.middle_line.original_line[0], tun.middle_line.original_line[1], space)[
  216. 1:-1]
  217. for i in range(len(points)):
  218. center = Node(points[i], '2d', divIndex)
  219. air_flow = AirFlow(core.get_random_id(), tun.layer_id, '', center, tun.middle_line, standard_width,
  220. tun.color,
  221. tun.air_type)
  222. self.air_flow_list.append(air_flow)
  223. def get_shaft(self):
  224. node_list = self.original_data["tunsMap"]
  225. for node in tqdm(node_list, desc=' 【立井读取中】'):
  226. node = node[1]
  227. if node["tunType"] == '1':
  228. divIndex = self.layer_map[node["nlayerid"]]['divIndex']
  229. center = Node((node["nfrom"]["x"], node["nfrom"]["z"]), '2d', divIndex)
  230. color = self.layer_map[node['nlayerid']]['ncolor']
  231. color = core.hex_to_rgb(color)
  232. none_line = Line(((0, 0), (0, 0)), '2d', 0)
  233. shaft = Shaft(node["ntunid"], node["nlayerid"], node["strname"], center, none_line, node["fwidth"],
  234. color)
  235. self.shaft_list.append(shaft)
  236. def get_fans(self):
  237. def process(fan, tun, center, color):
  238. fan_obj = Fan(
  239. fan['id'], fan["nlayerid"], fan["strname"],
  240. center, tun.middle_line, tun.width, color, fan['strtype']
  241. )
  242. self.fan_list.append(fan_obj)
  243. self._process_vent_graph(self.original_data["layerMap"], 'fans', '【风扇扇读取中】', process)
  244. def get_duct_list(self):
  245. layer_map = self.original_data["layerMap"]
  246. for layer in tqdm(layer_map, desc=' 【风筒读取中】'):
  247. fans = layer[1]["fans"]
  248. for f in fans:
  249. if 'fanlocal' not in f['strtype']: continue
  250. tun = self.tun_dict.get(f["ntunid"])
  251. if tun is None: continue
  252. path_list = []
  253. divIndex = self.layer_map[f['nlayerid']]['divIndex']
  254. for path in f['localFanCylinderPath']:
  255. from_to = (path[0]['x'], path[0]['z']), (path[1]['x'], path[1]['z'])
  256. path_line = Line(from_to, '2d', divIndex)
  257. path_list.append(path_line)
  258. no_center = Node((0, 0), '2d', divIndex)
  259. no_line = Line(((0, 0), (0, 0)), '2d', divIndex)
  260. air_duct = AirDuct(f['id'], f['nlayerid'], f['strname'], no_center, no_line, tun.width, tun.color,
  261. path_list)
  262. self.air_duct_list.append(air_duct)