import json import math import requests from tqdm import tqdm import core # # 旋转 # # global_route = -math.pi / 4 # global_route = 0 # # 缩放 # global_scale = 1 # # 偏移 # global_shift = 0, 0 class CADJson: def __init__(self, path, config): self.json = request_graph_point(path) self.config = config # with open("tun3D.json", 'r', encoding='utf-8') as f: # self.json = json.load(f) self.model = self.get_model() self.layer_map = self.get_layerMap() self.tun_plg_dict = self.get_tun_plg_dict() self.tun_dict = self.get_tun_dict() self.fan_list = self.get_fans() self.window_list = self.get_windows() self.gate_list = self.get_gates() # self.wind_flow_list = self.get_wind_flow() self.wind_bridge_list = self.get_wind_bridge() # self.tun_dict = self.get_tun_dict() self.sealed_list = self.get_sealed_list() self.air_flow_list = self.get_air_flow() self.air_flow_list_3d = self.get_air_flow_3d() self.window_list_3d = self.get_windows_3d() self.gates_list_3d = self.get_gates_3d() self.fans_list_3d = self.get_fans_3d() self.gaps_list = self.get_gap_node() self.tun_text_dict = self.get_tun_text() self.ari_duct_dict = self.get_ari_duct_dict() def global_process(self, point): point = core.rotate_point_around_another(point, (0, 0), self.config['global']['route']) point = core.scale_point(point[0], point[1], self.config['global']['scale']) point = point[0] + self.config['global']['shift']['x'], - (point[1] + self.config['global']['shift']['y']) return point def get_model(self): model = self.json["model"] model['cadOffsetX'] = self.config['agg']['shift']['x'] model['cadOffsetY'] = self.config['agg']['shift']['y'] model['cadAngle'] = self.config['agg']['route'] model['scale'] = self.config['agg']['scale'] return model def get_layerMap(self): layer_list = self.json["layerMap"] layer_map = {} for layer in layer_list: layer_map[layer[0]] = layer[1] i = 0 for key, value in layer_map.items(): layer_map[key]['cadOffsetX'] = self.config['div']['shift']['x'][i] layer_map[key]['cadOffsetY'] = self.config['div']['shift']['y'][i] layer_map[key]['cadAngle'] = self.config['div']['route'][i] layer_map[key]['scale'] = self.config['div']['scale'][i] i = i + 1 return layer_map def get_tun_on_line(self): node_list = self.json["tunsMap"] tun_text_dict = {} for node in tqdm(node_list, desc=' 【巷道文字读取中】'): node = node[1] if check_node(node) is False: continue if node['nusingtype'] == "1": fq = node['freportq']* 60 else: fq = node['fq'] * 60 formatted_fq = round(fq, 1) fq_text = str(formatted_fq) + "m³/min" fq_v = node['fq'] tun_text = node["strname"] + " " + fq_text if tun_text in tun_text_dict.keys(): tun_text_dict[tun_text]["tun_line"]["agg"].append( layer_aggregation(self.global_process((node["nfrom"]["x"], node["nfrom"]["z"])), self.model)) tun_text_dict[tun_text]["tun_line"]["agg"].append( layer_aggregation(self.global_process((node["nto"]["x"], node["nto"]["z"])), self.model)) tun_text_dict[tun_text]["tun_line"]["div"].append( layer_divide(self.global_process((node["nfrom"]["x"], node["nfrom"]["z"])), self.layer_map[node["nlayerid"]])) tun_text_dict[tun_text]["tun_line"]["div"].append( layer_divide(self.global_process((node["nto"]["x"], node["nto"]["z"])), self.layer_map[node["nlayerid"]])) else: tun_text_dict[tun_text] = { "fq_v": fq_v, "fq": fq_text, "text": tun_text, "layer": node["nlayerid"], "tun_line": {"agg": [], "div": []} } tun_text_dict[tun_text]["tun_line"]["agg"].append( layer_aggregation(self.global_process((node["nfrom"]["x"], node["nfrom"]["z"])), self.model)) tun_text_dict[tun_text]["tun_line"]["agg"].append( layer_aggregation(self.global_process((node["nto"]["x"], node["nto"]["z"])), self.model)) tun_text_dict[tun_text]["tun_line"]["div"].append( layer_divide(self.global_process((node["nfrom"]["x"], node["nfrom"]["z"])), self.layer_map[node["nlayerid"]])) tun_text_dict[tun_text]["tun_line"]["div"].append( layer_divide(self.global_process((node["nto"]["x"], node["nto"]["z"])), self.layer_map[node["nlayerid"]])) return tun_text_dict def get_tun_text(self): tun_text_dict = self.get_tun_on_line() for node in tqdm(tun_text_dict.values(), desc=' 【巷道文字读取中】'): node_list_agg = node['tun_line']["agg"] node_list_div = node['tun_line']["div"] farthest_agg = core.farthest_points(node_list_agg) farthest_div = core.farthest_points(node_list_div) tun_text_dict[node['text']]['farthest'] = {'agg': farthest_agg, 'div': farthest_div} return tun_text_dict def get_tun_plg_dict(self): if "tunsMap" not in self.json: return {} node_list = self.json["tunsMap"] tun_plg_dict = {} for node in tqdm(node_list, desc=' 【巷道读取中】'): obj = node[1] if "vecL1" not in obj: return {} vec_l_series = [ (obj['vecL1']['x'], obj['vecL1']['z']), (obj['vecL2']['x'], obj['vecL2']['z']), (obj['vecL3']['x'], obj['vecL3']['z']), (obj['vecL4']['x'], obj['vecL4']['z']) ] vec_r_series = [ (obj['vecR1']['x'], obj['vecR1']['z']), (obj['vecR2']['x'], obj['vecR2']['z']), (obj['vecR3']['x'], obj['vecR3']['z']), (obj['vecR4']['x'], obj['vecR4']['z']) ] vec_b_series = [ (obj['vec1']['x'], obj['vec1']['z']), (obj['vec2']['x'], obj['vec2']['z']), (obj['vec3']['x'], obj['vec3']['z']), (obj['vec4']['x'], obj['vec4']['z']) ] plg = { "color": core.hex_to_rgb(self.layer_map[obj['nlayerid']]['ncolor']), "layer_id": obj["nlayerid"], "width": core.min_distance_between_segments(vec_b_series[:2], vec_b_series[2:]), "type": obj["tunType"], "angle": obj["angleRad"], "air_type": obj["nairtype"], "from": { "agg": layer_aggregation(self.global_process((obj["nfrom"]["x"], obj["nfrom"]["z"])), self.model), "div": layer_divide(self.global_process((obj["nfrom"]["x"], obj["nfrom"]["z"])), self.layer_map[obj["nlayerid"]]) }, "to": {"agg": layer_aggregation(self.global_process((obj["nto"]["x"], obj["nto"]["z"])), self.model), "div": layer_divide(self.global_process((obj["nto"]["x"], obj["nto"]["z"])), self.layer_map[obj["nlayerid"]])}, "name": obj["strname"], "vec_r": {"agg": [], "div": []}, "vec_l": {"agg": [], "div": []}, "vec_b": {"agg": [], "div": [] } } fq = obj['fq'] * 60 formatted_fq = round(fq, 1) plg['fq'] = str(formatted_fq) + "m³/min" plg['fq_v'] = obj['fq'] for n in vec_b_series: agg_point = layer_aggregation(self.global_process((n[0], n[1])), self.model) plg["vec_b"]["agg"].append(agg_point) div_point = layer_divide(self.global_process((n[0], n[1])), self.layer_map[obj["nlayerid"]]) plg["vec_b"]["div"].append(div_point) for n in vec_l_series: agg_point = layer_aggregation(self.global_process((n[0], n[1])), self.model) plg["vec_l"]["agg"].append(agg_point) div_point = layer_divide(self.global_process((n[0], n[1])), self.layer_map[obj["nlayerid"]]) plg["vec_l"]["div"].append(div_point) for n in vec_r_series: agg_point = layer_aggregation(self.global_process((n[0], n[1])), self.model) plg["vec_r"]["agg"].append(agg_point) div_point = layer_divide(self.global_process((n[0], n[1])), self.layer_map[obj["nlayerid"]]) plg["vec_r"]["div"].append(div_point) plg["route"] = { "agg": core.calculate_angle_with_x_axis(plg['from']['agg'], plg['to']['agg']), "div": core.calculate_angle_with_x_axis(plg['from']['div'], plg['to']['div'])} tun_plg_dict[obj['ntunid']] = plg return tun_plg_dict def get_tun_dict(self): if not 'tunsMap' in self.json: return {} node_list = self.json["tunsMap"] tun_dict = {} for node in tqdm(node_list, desc=' 【巷道读取中】'): node = node[1] if check_node(node) is False: continue tun = { "tun_id": node["ntunid"], "layer_id": node["nlayerid"], "width": node["fwidth"], "type": node["tunType"], "angle": node["angleRad"], "air_type": node["nairtype"], "from": { "agg": layer_aggregation(self.global_process((node["nfrom"]["x"], node["nfrom"]["z"])), self.model), "div": layer_divide(self.global_process((node["nfrom"]["x"], node["nfrom"]["z"])), self.layer_map[node["nlayerid"]]) }, "to": {"agg": layer_aggregation(self.global_process((node["nto"]["x"], node["nto"]["z"])), self.model), "div": layer_divide(self.global_process((node["nto"]["x"], node["nto"]["z"])), self.layer_map[node["nlayerid"]])}, "name": node["strname"], "vec12": {"agg": [], "div": []}, "vec34": {"agg": [], "div": []} } fq = node['fq'] * 60 formatted_fq = round(fq, 1) tun['fq'] = str(formatted_fq) + "m³/min" tun['fq_v'] = node['fq'] if 'headVecList' in node and node['headVecList'] is not None and len(node['headVecList']) > 1: head_vec_from = node['headVecList'][0] head_vec_to = node['headVecList'][1] tun['head_vec_from'] = { "agg": layer_aggregation(self.global_process((head_vec_from["x"], head_vec_from["z"])), self.model), "div": layer_divide(self.global_process((head_vec_from["x"], head_vec_from["z"])), self.layer_map[node["nlayerid"]]) } tun['head_vec_to'] = { "agg": layer_aggregation(self.global_process((head_vec_to["x"], head_vec_to["z"])), self.model), "div": layer_divide(self.global_process((head_vec_to["x"], head_vec_to["z"])), self.layer_map[node["nlayerid"]]) } else: tun['head_vec_from'] = { "agg": None, "div": None } tun['head_vec_to'] = { "agg": None, "div": None } if "arrowShow" in node and node["arrowShow"] is not None and node["arrowShow"] != 0: tun['arrow_show'] = True else: tun['arrow_show'] = False for n in node["vec12"]: agg_point = layer_aggregation(self.global_process((n["x"], n["z"])), self.model) tun["vec12"]["agg"].append(agg_point) for n in node["layerVec12"]: div_point = layer_divide(self.global_process((n["x"], n["z"])), self.layer_map[node["nlayerid"]]) tun["vec12"]["div"].append(div_point) for n in node["vec34"]: agg_point = layer_aggregation(self.global_process((n["x"], n["z"])), self.model) tun["vec34"]["agg"].append(agg_point) for n in node["layerVec34"]: div_point = layer_divide(self.global_process((n["x"], n["z"])), self.layer_map[node["nlayerid"]]) tun["vec34"]["div"].append(div_point) if tun["type"] != "1" and len(tun["vec12"]) > 1 and len(tun["vec34"]) > 1: seg1 = tun["vec12"]['div'][0], tun["vec12"]['div'][-1] seg2 = tun["vec34"]['div'][0], tun["vec34"]['div'][-1] tun["width"] = core.min_distance_between_segments(seg1, seg2) tun["route"] = { "agg": core.calculate_angle_with_x_axis(tun['from']['agg'], tun['to']['agg']), "div": core.calculate_angle_with_x_axis(tun['from']['div'], tun['to']['div']) } else: tun["route"] = { "agg": 0, "div": 0 } tun["width"] = 3.1415936 tun_dict[tun["tun_id"]] = tun return tun_dict def get_gap_node(self): gaps_list = [] for node in self.json['nodeFaceMap']: gaps = [] for n in node: if 'screenX' in n: agg_point = layer_aggregation(self.global_process((n['screenX'], n['screenY'])), self.model) gaps.append(agg_point) gaps_list.append(gaps) return gaps_list def get_windows(self): layer_map = self.json["layerMap"] window_list = [] for layer in tqdm(layer_map, desc=' 【风窗读取中】'): windows = layer[1]["windows"] for w in windows: tun = self.tun_dict.get(w["ntunid"]) if tun is None: continue # tun = [tun_ for tun_ in self.tun_list if tun_.get("tun_id") == w["ntunid"]][0] window = {"layer": w["nlayerid"], "color": core.get_color_by_layer(w["nlayerid"]), "name": w["strname"], "id": w["id"], "center": { "agg": layer_aggregation(self.global_process((w['x'], w['z'])), self.model), "div": layer_divide(self.global_process((w['x'], w['z'])), self.layer_map[w["nlayerid"]]) }, 'route': {"agg": tun["route"]['agg'], "div": tun["route"]['div'] }, 'width': tun['width'] } window_list.append(window) return window_list def get_windows_3d(self): layer_map = self.json["layerMap"] window_list = [] for layer in tqdm(layer_map, desc=' 【风窗读取中】'): windows = layer[1]["windows"] for w in windows: tun = self.tun_plg_dict.get(w["ntunid"]) if tun is None: continue # tun = [tun_ for tun_ in self.tun_list if tun_.get("tun_id") == w["ntunid"]][0] window = {"layer": w["nlayerid"], "color": core.get_color_by_layer(w["nlayerid"]), "name": w["strname"], "id": w["id"], "center": { "agg": layer_aggregation(self.global_process((w['screenX'], w['screenY'])), self.model), "div": layer_divide(self.global_process((w['screenX'], w['screenY'])), self.layer_map[w["nlayerid"]]) }, 'route': {"agg": tun["route"]['agg'], "div": tun["route"]['div'] }, 'width': tun['width'] } window_list.append(window) return window_list def get_fans_3d(self): layer_map = self.json["layerMap"] fans_list = [] for layer in tqdm(layer_map, desc=' 【风扇读取中】'): fans = layer[1]["fans"] for f in fans: tun = self.tun_plg_dict.get(f["ntunid"]) if tun is None: continue fan = {"layer": f["nlayerid"], "color": core.get_color_by_layer(f["nlayerid"]), "name": f["strname"], "id": f["id"], "type": f["strtype"], "center": { "agg": layer_aggregation(self.global_process((f['screenX'], f['screenY'])), self.model), "div": layer_divide(self.global_process((f['screenX'], f['screenY'])), self.layer_map[f["nlayerid"]]) }, 'route': {"agg": tun["route"]['agg'], "div": tun["route"]['div'] }, 'width': tun['width'] } fans_list.append(fan) return fans_list def get_ari_duct_dict(self): layer_map = self.json["layerMap"] ari_duct_dict = {} for layer in tqdm(layer_map, desc=' 【风筒读取中】'): fans = layer[1]["fans"] for f in fans: tun = self.tun_dict.get(f["ntunid"]) if tun is None: continue ad_list = [] for path in f['localFanCylinderPath']: ad = { "layer": f["nlayerid"], "from": { "agg": layer_aggregation(self.global_process((path[0]['x'], path[0]['z'])), self.model), "div": layer_divide(self.global_process((path[0]['x'], path[0]['z'])), self.layer_map[f["nlayerid"]]), }, "to": { "agg": layer_aggregation(self.global_process((path[1]['x'], path[1]['z'])), self.model), "div": layer_divide(self.global_process((path[1]['x'], path[1]['z'])), self.layer_map[f["nlayerid"]]), }, 'route': {"agg": tun["route"]['agg'], "div": tun["route"]['div'] }, 'width': tun['width'] } ad_list.append(ad) ari_duct_dict[f['id']] = ad_list return ari_duct_dict def get_fans(self): layer_map = self.json["layerMap"] fans_list = [] for layer in tqdm(layer_map, desc=' 【风扇读取中】'): fans = layer[1]["fans"] for f in fans: tun = self.tun_dict.get(f["ntunid"]) if tun is None: continue fan = {"layer": f["nlayerid"], "color": core.get_color_by_layer(f["nlayerid"]), "type": f["strtype"], "name": f["strname"], "id": f["id"], "center": { "agg": layer_aggregation(self.global_process((f['x'], f['z'])), self.model), "div": layer_divide(self.global_process((f['x'], f['z'])), self.layer_map[f["nlayerid"]]) }, 'route': {"agg": tun["route"]['agg'], "div": tun["route"]['div'] }, 'width': tun['width'] } fans_list.append(fan) return fans_list # # def get_fans_3d(self): # layer_map = self.json["layerMap"] # fans_list = [] # for layer in tqdm(layer_map, desc=' 【风扇读取中】'): # fans = layer[1]["fans"] # for f in fans: # tun = self.tun_plg_dict.get(f["ntunid"]) # if tun is None: continue # fan = {"layer": f["nlayerid"], "color": core.get_color_by_layer(f["nlayerid"]), # "type": f["strtype"], # "name": f["strname"], # "id": f["id"], # "center": { # "agg": layer_aggregation(self.global_process((f['screenX'], f['screenY'])), self.model), # "div": layer_divide(self.global_process((f['screenX'], f['screenY'])), # self.layer_map[f["nlayerid"]]) # }, # 'route': {"agg": tun["route"]['agg'], # "div": tun["route"]['div'] # }, # 'width': tun['width'] # } # fans_list.append(fan) # return fans_list def get_gates_3d(self): layer_map = self.json["layerMap"] gate_list = [] for layer in tqdm(layer_map, desc=' 【风门读取中】'): gates = layer[1]["gates"] for g in gates: tun = self.tun_plg_dict.get(g["ntunid"]) if tun is None: continue gate = {"layer": g["nlayerid"], "color": core.get_color_by_layer(g["nlayerid"]), "type": g["strtype"], "name": g["strname"], "id": g["id"], "center": { "agg": layer_aggregation(self.global_process((g['screenX'], g['screenY'])), self.model), "div": layer_divide(self.global_process((g['screenX'], g['screenY'])), self.layer_map[g["nlayerid"]]) }, 'route': {"agg": tun["route"]['agg'], "div": tun["route"]['div'] }, 'width': tun['width'] } gate_list.append(gate) return gate_list def get_sealed_list(self): layer_map = self.json["layerMap"] sealed_list = [] for layer in tqdm(layer_map, desc=' 【密闭读取中】'): sealeds = layer[1]["obfurages"] for s in sealeds: tun = self.tun_dict.get(s["ntunid"]) sealed = {"layer": s["nlayerid"], "color": core.get_color_by_layer(s["nlayerid"]), "id": s["id"], "center": { "agg": layer_aggregation(self.global_process((s['x'], s['z'])), self.model), "div": layer_divide(self.global_process((s['x'], s['z'])), self.layer_map[s["nlayerid"]]) }, 'route': {"agg": tun["route"]['agg'], "div": tun["route"]['div'] }, 'tun_id': s["ntunid"], 'width': tun['width'] } sealed_list.append(sealed) return sealed_list def get_gates(self): layer_map = self.json["layerMap"] gate_list = [] for layer in tqdm(layer_map, desc=' 【风门读取中】'): gates = layer[1]["gates"] for g in gates: tun = self.tun_dict.get(g["ntunid"]) if tun is None: continue gate = {"layer": g["nlayerid"], "color": core.get_color_by_layer(g["nlayerid"]), "type": g["strtype"], "name": g["strname"], "id": g["id"], "center": { "agg": layer_aggregation(self.global_process((g['x'], g['z'])), self.model), "div": layer_divide(self.global_process((g['x'], g['z'])), self.layer_map[g["nlayerid"]]) }, 'route': {"agg": tun["route"]['agg'], "div": tun["route"]['div'] }, 'width': tun['width'] } gate_list.append(gate) return gate_list def calculate_route_middle(self, path, layer_id): path_start = self.global_process((path[0]['x'], path[0]['z'])) path_end = self.global_process((path[-1]['x'], path[-1]['z'])) path_start_agg = layer_aggregation(path_start, self.model) path_end_agg = layer_aggregation(path_end, self.model) path_start_div = layer_divide(path_start, self.layer_map[layer_id]) path_end_div = layer_divide(path_end, self.layer_map[layer_id]) path_middle_agg = core.find_point_on_line(path_start_agg, path_end_agg, 1 / 2) path_middle_div = core.find_point_on_line(path_start_div, path_end_div, 1 / 2) route_agg = core.calculate_angle_with_x_axis(path_start_agg, path_end_agg) route_div = core.calculate_angle_with_x_axis(path_start_div, path_end_div) return {"middle": {"agg": path_middle_agg, "div": path_middle_div}, "route": {"agg": route_agg, "div": route_div}} def calculate_segment_route_middle(self, vector, layer_id): path_start = self.global_process(vector[0]) path_end = self.global_process(vector[1]) path_start_agg = layer_aggregation(path_start, self.model) path_end_agg = layer_aggregation(path_end, self.model) path_start_div = layer_divide(path_start, self.layer_map[layer_id]) path_end_div = layer_divide(path_end, self.layer_map[layer_id]) path_middle_agg = core.find_point_on_line(path_start_agg, path_end_agg, 1 / 2) path_middle_div = core.find_point_on_line(path_start_div, path_end_div, 1 / 2) route_agg = core.calculate_angle_with_x_axis(path_start_agg, path_end_agg) route_div = core.calculate_angle_with_x_axis(path_start_div, path_end_div) return {"middle": {"agg": path_middle_agg, "div": path_middle_div}, "route": {"agg": route_agg, "div": route_div}} def get_air_flow_3d(self): air_flow_3d_list = [] for tun in tqdm(self.tun_plg_dict.values(), desc='【风流方向读取中】'): vec_agg = tun['from']['agg'], tun['to']['agg'] vec_div = tun["from"]["div"], tun["to"]["div"] distance = core.distance(vec_agg[0], vec_agg[1]) flow_len = tun['width'] * 5 if distance < 6 * tun['width']: continue if tun['fq_v'] < 0.3: continue elif 6 * tun['width'] < distance < 12 * tun['width']: middle_agg = core.find_point_on_line(vec_agg[0], vec_agg[1], 1 / 2) middle_div = core.find_point_on_line(vec_div[0], vec_div[1], 1 / 2) air_flow = { "layer": tun['layer_id'], "center": { "agg": middle_agg, "div": middle_div }, "route": {"agg": tun["route"]['agg'], "div": tun['route']['div']}, "type": tun['air_type'], "len": flow_len } air_flow_3d_list.append(air_flow) else: space = int(distance // (100 * tun['width'])) agg_points = core.divide_segment(vec_agg[0], vec_agg[1], space)[1:-1] div_points = core.divide_segment(vec_div[0], vec_div[1], space)[1:-1] for i in range(len(agg_points)): air_flow = { "layer": tun['layer_id'], "center": { "agg": agg_points[i], "div": div_points[i] }, "route": {"agg": tun["route"]['agg'], "div": tun['route']['div']}, "type": tun['air_type'], "len": flow_len } air_flow_3d_list.append(air_flow) return air_flow_3d_list def get_air_flow(self): air_flow_list = [] tun_ids = [obj["tun_id"] for obj in self.sealed_list] for tun in tqdm(self.tun_dict.values(), desc='【风流方向读取中】'): vec_agg = tun['from']['agg'], tun['to']['agg'] vec_div = tun["from"]["div"], tun["to"]["div"] distance = core.distance(vec_agg[0], vec_agg[1]) flow_len = tun['width'] * 5 if not tun['arrow_show']: if tun['tun_id'] in tun_ids: continue # 密闭空间不画风流 if tun['fq_v'] < 0.3: continue if distance < 6 * tun['width']: continue if 50 < distance < 100: middle_agg = core.find_point_on_line(vec_agg[0], vec_agg[1], 1 / 2) middle_div = core.find_point_on_line(vec_div[0], vec_div[1], 1 / 2) air_flow = { "layer": tun['layer_id'], "center": { "agg": middle_agg, "div": middle_div }, "route": {"agg": tun["route"]['agg'], "div": tun['route']['div']}, "type": tun['air_type'], "len": flow_len } air_flow_list.append(air_flow) else: space = int(distance // (100 * tun['width'])) agg_points = core.divide_segment(vec_agg[0], vec_agg[1], space)[1:-1] div_points = core.divide_segment(vec_div[0], vec_div[1], space)[1:-1] for i in range(len(agg_points)): air_flow = { "layer": tun['layer_id'], "center": { "agg": agg_points[i], "div": div_points[i] }, "route": {"agg": tun["route"]['agg'], "div": tun['route']['div']}, "type": tun['air_type'], "len": flow_len } air_flow_list.append(air_flow) return air_flow_list # def get_wind_flow(self): # path_point_map = self.json["pathLineMap"] # wind_flow_list = { # "in": [], # "no": [], # "out": [], # "use": [] # } # for path_point in tqdm(path_point_map, desc=' 【风流方向读取中】'): # in_paths = path_point[1]['inPaths'] # for path in in_paths: # middle_route = self.calculate_route_middle(path, path_point[0]) # in_path = { # "layer": path_point[0], # "center": { # "agg": middle_route["middle"]["agg"], # "div": middle_route["middle"]["div"] # }, # "route": {"agg": middle_route["route"]["agg"], # "div": middle_route["route"]["div"]}, # "color": "3", # "type": "3" # } # wind_flow_list["in"].append(in_path) # # no_paths = path_point[1]['noPaths'] # for path in no_paths: # middle_route = self.calculate_route_middle(path, path_point[0]) # no_path = { # "layer": path_point[0], # "center": { # "agg": middle_route["middle"]["agg"], # "div": middle_route["middle"]["div"] # }, # "route": {"agg": middle_route["route"]["agg"], # "div": middle_route["route"]["div"]}, # "color": "3", # "type": "3" # } # wind_flow_list["no"].append(no_path) # # out_paths = path_point[1]['outPaths'] # for path in out_paths: # middle_route = self.calculate_route_middle(path, path_point[0]) # out_path = { # "layer": path_point[0], # "center": { # "agg": middle_route["middle"]["agg"], # "div": middle_route["middle"]["div"] # }, # "route": {"agg": middle_route["route"]["agg"], # "div": middle_route["route"]["div"]}, # "color": "1", # "type": "1" # } # wind_flow_list["out"].append(out_path) # use_paths = path_point[1]['usePaths'] # for path in use_paths: # middle_route = self.calculate_route_middle(path, path_point[0]) # use_path = { # "layer": path_point[0], # "center": { # "agg": middle_route["middle"]["agg"], # "div": middle_route["middle"]["div"] # }, # "route": {"agg": middle_route["route"]["agg"], # "div": middle_route["route"]["div"]}, # "color": "1", # "type": "1" # } # wind_flow_list["use"].append(use_path) # # return wind_flow_list def get_wind_bridge(self): layer_map = self.json["layerMap"] wind_bridge_list = [] for layer in tqdm(layer_map, desc=' 【风桥读取中】'): cross_nodes = layer[1]['crossNodes'] for cross_node in cross_nodes: center = cross_node['crossPoint']['x'], -cross_node['crossPoint']['y'] # tun = [tun_ for tun_ in self.tun_list if tun_.get("tun_id") == cross_node['tun1Id']][0] tun = self.tun_dict.get(cross_node["tun1Id"]) if tun is None: continue wind_bridge = { 'layer': layer[0], 'color': core.get_color_by_layer(layer[0]), "center": { "agg": layer_aggregation(self.global_process(center), self.model), "div": layer_divide(self.global_process(center), self.layer_map[layer[0]]) }, 'route': {"agg": tun["route"]['agg'], "div": tun['route']['div']}, 'width': tun['width'] * 1.5114514 } wind_bridge_list.append(wind_bridge) return wind_bridge_list def get_wind_bridge_3d(self): layer_map = self.json["layerMap"] wind_bridge_list = [] for layer in tqdm(layer_map, desc=' 【风桥读取中】'): cross_nodes = layer[1]['crossNodes'] for cross_node in cross_nodes: center = cross_node['crossPoint']['x'], -cross_node['crossPoint']['y'] # tun = [tun_ for tun_ in self.tun_list if tun_.get("tun_id") == cross_node['tun1Id']][0] tun = self.tun_plg_dict.get(cross_node["tun1Id"]) if tun is None: continue wind_bridge = { 'layer': layer[0], 'color': core.get_color_by_layer(layer[0]), "center": { "agg": layer_aggregation(self.global_process(center), self.model), "div": layer_divide(self.global_process(center), self.layer_map[layer[0]]) }, 'route': {"agg": tun["route"]['agg'], "div": tun['route']['div']}, 'width': tun['width'] * 1.5114514 } wind_bridge_list.append(wind_bridge) return wind_bridge_list def layer_aggregation(point, model): coefficient_x = model['cadOffsetX'] if 'cadOffsetX' in model.keys() else 0 coefficient_y = model['cadOffsetY'] if 'cadOffsetY' in model.keys() else 0 coefficient_route = model['cadAngle'] if 'cadAngle' in model.keys() else 0 point_ = core.rotate_point_around_another(point, (0, 0), coefficient_route) point_ = point_[0] + coefficient_x, point_[1] + coefficient_y return point_ def layer_divide(point, layer): coefficient_x = layer['cadOffsetX'] if 'cadOffsetX' in layer.keys() or layer['cadOffsetX'] is None else 0 coefficient_y = layer['cadOffsetY'] if 'cadOffsetY' in layer.keys() or layer['cadOffsetY'] is None else 0 coefficient_route = layer['cadAngle'] if 'cadAngle' in layer.keys() or layer['cadAngle'] is None else 0 point_ = core.rotate_point_around_another(point, (0, 0), coefficient_route) point_ = point_[0] + coefficient_x, point_[1] + coefficient_y return point_ def request_graph_point(url): response = requests.get(url, timeout=10) print(f"请求耗时{url} {response.elapsed.total_seconds()} 秒") cad_json = {} if response.status_code == 200: cad_json = response.json() # 将响应内容解析为JSON格式 else: print(f"请求失败,状态码:{response.status_code}") return cad_json def check_node(node): flag = True if node["vec12"] is None or node["vec34"] is None: flag = False return flag if "layerVec12" not in node or "layerVec34" not in node or node["layerVec12"] is None or node["layerVec34"] is None: flag = False return flag if len(node["vec12"]) < 1 or len(node["vec34"]) < 1: flag = False return flag if len(node["layerVec12"]) < 1 or len(node["layerVec34"]) < 1: flag = False return flag else: if node["vec12"][0]['x'] is None or node["vec12"][0]['z'] is None: flag = False if node["vec12"][1]['x'] is None or node["vec12"][1]['z'] is None: flag = False if node["vec34"][0]['x'] is None or node["vec34"][0]['z'] is None: flag = False if node["vec34"][1]['x'] is None or node["vec34"][1]['z'] is None: flag = False if node["layerVec12"][0]['x'] is None or node["layerVec12"][0]['z'] is None: flag = False if node["layerVec12"][1]['x'] is None or node["layerVec12"][1]['z'] is None: flag = False if node["layerVec34"][0]['x'] is None or node["layerVec34"][0]['z'] is None: flag = False if node["layerVec34"][1]['x'] is None or node["layerVec34"][1]['z'] is None: flag = False return flag