import json import math import requests from tqdm import tqdm import core # 旋转 # global_route = -math.pi / 4 global_route = 0 # 缩放 global_scale = 1 # 偏移 global_shift = 0, 0 def global_process(point): point = core.rotate_point_around_another(point, (0, 0), global_route) point = core.scale_point(point[0], point[1], global_scale) point = point[0] + global_shift[0], - (point[1] + global_shift[1]) return point def layer_aggregation(point, model): coefficient_x = model['cadOffsetX'] if 'cadOffsetX' in model.keys() else 0 coefficient_y = model['cadOffsetY'] if 'cadOffsetY' in model.keys() else 0 coefficient_route = model['cadAngle'] if 'cadAngle' in model.keys() else 0 point_ = core.rotate_point_around_another(point, (0, 0), coefficient_route) point_ = point_[0] + coefficient_x, point_[1] + coefficient_y return point_ def layer_divide(point, layer): coefficient_x = layer['cadOffsetX'] if 'cadOffsetX' in layer.keys() or layer['cadOffsetX'] is None else 0 coefficient_y = layer['cadOffsetY'] if 'cadOffsetY' in layer.keys() or layer['cadOffsetY'] is None else 0 coefficient_route = layer['cadAngle'] if 'cadAngle' in layer.keys() or layer['cadAngle'] is None else 0 point_ = core.rotate_point_around_another(point, (0, 0), coefficient_route) point_ = point_[0] + coefficient_x, point_[1] + coefficient_y return point_ def request_graph_point(url): response = requests.get(url, timeout=10) print(f"请求耗时 {response.elapsed.total_seconds()} 秒") cad_json = {} if response.status_code == 200: cad_json = response.json() # 将响应内容解析为JSON格式 else: print(f"请求失败,状态码:{response.status_code}") return cad_json def check_node(node): flag = True if len(node["vec12"]) < 1 or len(node["vec34"]) < 1: flag = False return flag else: if node["vec12"][0]['x'] is None or node["vec12"][0]['z'] is None: flag = False if node["vec12"][1]['x'] is None or node["vec12"][1]['z'] is None: flag = False if node["vec34"][0]['x'] is None or node["vec34"][0]['z'] is None: flag = False if node["vec34"][1]['x'] is None or node["vec34"][1]['z'] is None: flag = False if node["layerVec12"][0]['x'] is None or node["layerVec12"][0]['z'] is None: flag = False if node["layerVec12"][1]['x'] is None or node["layerVec12"][1]['z'] is None: flag = False if node["layerVec34"][0]['x'] is None or node["layerVec34"][0]['z'] is None: flag = False if node["layerVec34"][1]['x'] is None or node["layerVec34"][1]['z'] is None: flag = False return flag class CADJson: def __init__(self, path): if "http://" in path or "https://" in path: self.json = request_graph_point(path) else: with open(path, 'r', encoding='utf-8') as r: self.json = json.loads(r.read()) self.model = self.json["model"] self.layer_map = self.get_layerMap() # self.tun_list = self.get_tuns() self.tun_dict = self.get_tun_dict() self.fan_list = self.get_fans() self.window_list = self.get_windows() self.gate_list = self.get_gates() # self.wind_flow_list = self.get_wind_flow() self.wind_bridge_list = self.get_wind_bridge() self.tun_dict = self.get_tun_dict() self.air_flow_list = self.get_air_flow() def get_layerMap(self): layer_list = self.json["layerMap"] layer_map = {} for layer in layer_list: layer_map[layer[0]] = layer[1] return layer_map def get_tun_dict(self): node_list = self.json["tunsMap"] tun_dict = {} for node in tqdm(node_list, desc=' 【巷道读取中】'): node = node[1] if check_node(node) is False: continue tun = { "tun_id": node["ntunid"], "layer_id": node["nlayerid"], "width": node["fwidth"], "type": node["tunType"], "angle": node["angleRad"], "air_type": node["nairtype"], "from": {"agg": layer_aggregation(global_process((node["nfrom"]["x"], node["nfrom"]["z"])), self.model), "div": layer_divide(global_process((node["nfrom"]["x"], node["nfrom"]["z"])), self.layer_map[node["nlayerid"]]) }, "to": {"agg": layer_aggregation(global_process((node["nto"]["x"], node["nto"]["z"])), self.model), "div": layer_divide(global_process((node["nto"]["x"], node["nto"]["z"])), self.layer_map[node["nlayerid"]])}, "name": node["strname"], "vec12": {"agg": [], "div": []}, "vec34": {"agg": [], "div": []} } for n in node["vec12"]: agg_point = layer_aggregation(global_process((n["x"], n["z"])), self.model) tun["vec12"]["agg"].append(agg_point) for n in node["layerVec12"]: div_point = layer_divide(global_process((n["x"], n["z"])), self.layer_map[node["nlayerid"]]) tun["vec12"]["div"].append(div_point) for n in node["vec34"]: agg_point = layer_aggregation(global_process((n["x"], n["z"])), self.model) tun["vec34"]["agg"].append(agg_point) for n in node["layerVec34"]: div_point = layer_divide(global_process((n["x"], n["z"])), self.layer_map[node["nlayerid"]]) tun["vec34"]["div"].append(div_point) if tun["type"] != "1" and len(tun["vec12"]) > 1 and len(tun["vec34"]) > 1: seg1 = tun["vec12"]['div'][0], tun["vec12"]['div'][-1] seg2 = tun["vec34"]['div'][0], tun["vec34"]['div'][-1] tun["width"] = core.min_distance_between_segments(seg1, seg2) tun["route"] = { "agg": core.calculate_angle_with_x_axis(tun['from']['agg'], tun['to']['agg']), "div": core.calculate_angle_with_x_axis(tun['from']['div'], tun['to']['div']) } else: tun["route"] = { "agg": 0, "div": 0 } tun["width"] = 3.1415936 tun_dict[tun["tun_id"]] = tun return tun_dict def get_tuns(self): node_list = self.json["tunsMap"] tun_list = [] for node in tqdm(node_list, desc=' 【巷道读取中】'): node = node[1] if check_node(node) is False: continue # node['vec12'] = node['layerVec12'] # node['vec34'] = node['layerVec34'] tun = { "tun_id": node["ntunid"], "layer_id": node["nlayerid"], "width": node["fwidth"], "type": node["tunType"], "air_type": node["nairtype"], "angle": node["angleRad"], "from": {"agg": layer_aggregation(global_process((node["nfrom"]["x"], node["nfrom"]["z"])), self.model), "div": layer_divide(global_process((node["nfrom"]["x"], node["nfrom"]["z"])), self.layer_map[node["nlayerid"]]) }, "to": {"agg": layer_aggregation(global_process((node["nto"]["x"], node["nto"]["z"])), self.model), "div": layer_divide(global_process((node["nto"]["x"], node["nto"]["z"])), self.layer_map[node["nlayerid"]])}, "name": node["strname"], "vec12": {"agg": [], "div": []}, "vec34": {"agg": [], "div": []} } for n in node["vec12"]: agg_point = layer_aggregation(global_process((n["x"], n["z"])), self.model) tun["vec12"]["agg"].append(agg_point) for n in node["layerVec12"]: div_point = layer_divide(global_process((n["x"], n["z"])), self.layer_map[node["nlayerid"]]) tun["vec12"]["div"].append(div_point) for n in node["vec34"]: agg_point = layer_aggregation(global_process((n["x"], n["z"])), self.model) tun["vec34"]["agg"].append(agg_point) for n in node["layerVec34"]: div_point = layer_divide(global_process((n["x"], n["z"])), self.layer_map[node["nlayerid"]]) tun["vec34"]["div"].append(div_point) if tun["type"] != "1" and len(tun["vec12"]) > 1 and len(tun["vec34"]) > 1: seg1 = tun["vec12"]['div'][0], tun["vec12"]['div'][-1] seg2 = tun["vec34"]['div'][0], tun["vec34"]['div'][-1] tun["width"] = core.min_distance_between_segments(seg1, seg2) tun["route"] = { "agg": core.calculate_angle_with_x_axis(tun['from']['agg'], tun['to']['agg']), "div": core.calculate_angle_with_x_axis(tun['from']['div'], tun['to']['div']) } else: tun["route"] = { "agg": 0, "div": 0 } tun["width"] = 3.1415936 print(tun['air_type']) tun_list.append(tun) return tun_list def get_windows(self): layer_map = self.json["layerMap"] window_list = [] for layer in tqdm(layer_map, desc=' 【风窗读取中】'): windows = layer[1]["windows"] for w in windows: tun = self.tun_dict.get(w["ntunid"]) if tun is None: continue # tun = [tun_ for tun_ in self.tun_list if tun_.get("tun_id") == w["ntunid"]][0] window = {"layer": w["nlayerid"], "color": core.get_color_by_layer(w["nlayerid"]), "name": w["strname"], "id": w["id"], "center": { "agg": layer_aggregation(global_process((w['x'], w['z'])), self.model), "div": layer_divide(global_process((w['x'], w['z'])), self.layer_map[w["nlayerid"]]) }, 'route': {"agg": tun["route"]['agg'], "div": tun["route"]['div'] }, 'width': tun['width'] } window_list.append(window) return window_list def get_fans(self): layer_map = self.json["layerMap"] fans_list = [] for layer in tqdm(layer_map, desc=' 【风扇读取中】'): fans = layer[1]["fans"] for f in fans: tun = self.tun_dict.get(f["ntunid"]) if tun is None: continue fan = {"layer": f["nlayerid"], "color": core.get_color_by_layer(f["nlayerid"]), "type": f["strtype"], "name": f["strname"], "id": f["id"], "center": { "agg": layer_aggregation(global_process((f['x'], f['z'])), self.model), "div": layer_divide(global_process((f['x'], f['z'])), self.layer_map[f["nlayerid"]]) }, 'route': {"agg": tun["route"]['agg'], "div": tun["route"]['div'] }, 'width': tun['width'] } fans_list.append(fan) return fans_list def get_gates(self): layer_map = self.json["layerMap"] gate_list = [] for layer in tqdm(layer_map, desc=' 【风门读取中】'): gates = layer[1]["gates"] for g in gates: tun = self.tun_dict.get(g["ntunid"]) if tun is None: continue gate = {"layer": g["nlayerid"], "color": core.get_color_by_layer(g["nlayerid"]), "type": g["strtype"], "name": g["strname"], "id": g["id"], "center": { "agg": layer_aggregation(global_process((g['x'], g['z'])), self.model), "div": layer_divide(global_process((g['x'], g['z'])), self.layer_map[g["nlayerid"]]) }, 'route': {"agg": tun["route"]['agg'], "div": tun["route"]['div'] }, 'width': tun['width'] } gate_list.append(gate) return gate_list def calculate_route_middle(self, path, layer_id): path_start = global_process((path[0]['x'], path[0]['z'])) path_end = global_process((path[-1]['x'], path[-1]['z'])) path_start_agg = layer_aggregation(path_start, self.model) path_end_agg = layer_aggregation(path_end, self.model) path_start_div = layer_divide(path_start, self.layer_map[layer_id]) path_end_div = layer_divide(path_end, self.layer_map[layer_id]) path_middle_agg = core.find_point_on_line(path_start_agg, path_end_agg, 1 / 2) path_middle_div = core.find_point_on_line(path_start_div, path_end_div, 1 / 2) route_agg = core.calculate_angle_with_x_axis(path_start_agg, path_end_agg) route_div = core.calculate_angle_with_x_axis(path_start_div, path_end_div) return {"middle": {"agg": path_middle_agg, "div": path_middle_div}, "route": {"agg": route_agg, "div": route_div}} def calculate_segment_route_middle(self, vector, layer_id): path_start = global_process(vector[0]) path_end = global_process(vector[1]) path_start_agg = layer_aggregation(path_start, self.model) path_end_agg = layer_aggregation(path_end, self.model) path_start_div = layer_divide(path_start, self.layer_map[layer_id]) path_end_div = layer_divide(path_end, self.layer_map[layer_id]) path_middle_agg = core.find_point_on_line(path_start_agg, path_end_agg, 1 / 2) path_middle_div = core.find_point_on_line(path_start_div, path_end_div, 1 / 2) route_agg = core.calculate_angle_with_x_axis(path_start_agg, path_end_agg) route_div = core.calculate_angle_with_x_axis(path_start_div, path_end_div) return {"middle": {"agg": path_middle_agg, "div": path_middle_div}, "route": {"agg": route_agg, "div": route_div}} def get_air_flow(self): air_flow_list = [] for tun in tqdm(self.tun_dict.values(), desc='【风流方向读取中】'): vec_agg = tun['from']['agg'], tun['to']['agg'] vec_div = tun["from"]["div"], tun["to"]["div"] distance = core.distance(vec_agg[0], vec_agg[1]) if 50 < distance < 100: middle_agg = core.find_point_on_line(vec_agg[0], vec_agg[1], 1 / 2) middle_div = core.find_point_on_line(vec_div[0], vec_div[1], 1 / 2) air_flow = { "layer": tun['layer_id'], "center": { "agg": middle_agg, "div": middle_div }, "route": {"agg": tun["route"]['agg'], "div": tun['route']['div']}, "type": tun['air_type'] } air_flow_list.append(air_flow) else: space = int(distance // 100) agg_points = core.divide_segment(vec_agg[0], vec_agg[1], space)[1:-1] div_points = core.divide_segment(vec_div[0], vec_div[1], space)[1:-1] for i in range(len(agg_points)): air_flow = { "layer": tun['layer_id'], "center": { "agg": agg_points[i], "div": div_points[i] }, "route": {"agg": tun["route"]['agg'], "div": tun['route']['div']}, "type": tun['air_type'] } air_flow_list.append(air_flow) return air_flow_list # def get_wind_flow(self): # path_point_map = self.json["pathLineMap"] # wind_flow_list = { # "in": [], # "no": [], # "out": [], # "use": [] # } # for path_point in tqdm(path_point_map, desc=' 【风流方向读取中】'): # in_paths = path_point[1]['inPaths'] # for path in in_paths: # middle_route = self.calculate_route_middle(path, path_point[0]) # in_path = { # "layer": path_point[0], # "center": { # "agg": middle_route["middle"]["agg"], # "div": middle_route["middle"]["div"] # }, # "route": {"agg": middle_route["route"]["agg"], # "div": middle_route["route"]["div"]}, # "color": "3", # "type": "3" # } # wind_flow_list["in"].append(in_path) # # no_paths = path_point[1]['noPaths'] # for path in no_paths: # middle_route = self.calculate_route_middle(path, path_point[0]) # no_path = { # "layer": path_point[0], # "center": { # "agg": middle_route["middle"]["agg"], # "div": middle_route["middle"]["div"] # }, # "route": {"agg": middle_route["route"]["agg"], # "div": middle_route["route"]["div"]}, # "color": "3", # "type": "3" # } # wind_flow_list["no"].append(no_path) # # out_paths = path_point[1]['outPaths'] # for path in out_paths: # middle_route = self.calculate_route_middle(path, path_point[0]) # out_path = { # "layer": path_point[0], # "center": { # "agg": middle_route["middle"]["agg"], # "div": middle_route["middle"]["div"] # }, # "route": {"agg": middle_route["route"]["agg"], # "div": middle_route["route"]["div"]}, # "color": "1", # "type": "1" # } # wind_flow_list["out"].append(out_path) # use_paths = path_point[1]['usePaths'] # for path in use_paths: # middle_route = self.calculate_route_middle(path, path_point[0]) # use_path = { # "layer": path_point[0], # "center": { # "agg": middle_route["middle"]["agg"], # "div": middle_route["middle"]["div"] # }, # "route": {"agg": middle_route["route"]["agg"], # "div": middle_route["route"]["div"]}, # "color": "1", # "type": "1" # } # wind_flow_list["use"].append(use_path) # # return wind_flow_list def get_wind_bridge(self): layer_map = self.json["layerMap"] wind_bridge_list = [] for layer in tqdm(layer_map, desc=' 【风桥读取中】'): cross_nodes = layer[1]['crossNodes'] for cross_node in cross_nodes: center = cross_node['crossPoint']['x'], -cross_node['crossPoint']['y'] # tun = [tun_ for tun_ in self.tun_list if tun_.get("tun_id") == cross_node['tun1Id']][0] tun = self.tun_dict.get(cross_node["tun1Id"]) if tun is None: continue wind_bridge = { 'layer': layer[0], 'color': core.get_color_by_layer(layer[0]), "center": { "agg": layer_aggregation(global_process(center), self.model), "div": layer_divide(global_process(center), self.layer_map[layer[0]]) }, 'route': {"agg": tun["route"]['agg'], "div": tun['route']['div']}, 'width': tun['width'] * 1.5114514 } wind_bridge_list.append(wind_bridge) return wind_bridge_list