123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501 |
- import json
- import math
- import requests
- from tqdm import tqdm
- import core
- # 旋转
- # global_route = -math.pi / 4
- global_route = 0
- # 缩放
- global_scale = 1
- # 偏移
- global_shift = 0, 0
- def global_process(point):
- point = core.rotate_point_around_another(point, (0, 0), global_route)
- point = core.scale_point(point[0], point[1], global_scale)
- point = point[0] + global_shift[0], - (point[1] + global_shift[1])
- return point
- def layer_aggregation(point, model):
- coefficient_x = model['cadOffsetX'] if 'cadOffsetX' in model.keys() else 0
- coefficient_y = model['cadOffsetY'] if 'cadOffsetY' in model.keys() else 0
- coefficient_route = model['cadAngle'] if 'cadAngle' in model.keys() else 0
- point_ = core.rotate_point_around_another(point, (0, 0), coefficient_route)
- point_ = point_[0] + coefficient_x, point_[1] + coefficient_y
- return point_
- def layer_divide(point, layer):
- coefficient_x = layer['cadOffsetX'] if 'cadOffsetX' in layer.keys() or layer['cadOffsetX'] is None else 0
- coefficient_y = layer['cadOffsetY'] if 'cadOffsetY' in layer.keys() or layer['cadOffsetY'] is None else 0
- coefficient_route = layer['cadAngle'] if 'cadAngle' in layer.keys() or layer['cadAngle'] is None else 0
- point_ = core.rotate_point_around_another(point, (0, 0), coefficient_route)
- point_ = point_[0] + coefficient_x, point_[1] + coefficient_y
- return point_
- def request_graph_point(url):
- response = requests.get(url, timeout=10)
- print(f"请求耗时 {response.elapsed.total_seconds()} 秒")
- cad_json = {}
- if response.status_code == 200:
- cad_json = response.json() # 将响应内容解析为JSON格式
- else:
- print(f"请求失败,状态码:{response.status_code}")
- return cad_json
- def check_node(node):
- flag = True
- if len(node["vec12"]) < 1 or len(node["vec34"]) < 1:
- flag = False
- return flag
- else:
- if node["vec12"][0]['x'] is None or node["vec12"][0]['z'] is None:
- flag = False
- if node["vec12"][1]['x'] is None or node["vec12"][1]['z'] is None:
- flag = False
- if node["vec34"][0]['x'] is None or node["vec34"][0]['z'] is None:
- flag = False
- if node["vec34"][1]['x'] is None or node["vec34"][1]['z'] is None:
- flag = False
- if node["layerVec12"][0]['x'] is None or node["layerVec12"][0]['z'] is None:
- flag = False
- if node["layerVec12"][1]['x'] is None or node["layerVec12"][1]['z'] is None:
- flag = False
- if node["layerVec34"][0]['x'] is None or node["layerVec34"][0]['z'] is None:
- flag = False
- if node["layerVec34"][1]['x'] is None or node["layerVec34"][1]['z'] is None:
- flag = False
- return flag
- class CADJson:
- def __init__(self, path):
- if "http://" in path or "https://" in path:
- self.json = request_graph_point(path)
- else:
- with open(path, 'r', encoding='utf-8') as r:
- self.json = json.loads(r.read())
- self.model = self.json["model"]
- self.layer_map = self.get_layerMap()
- # self.tun_list = self.get_tuns()
- self.tun_dict = self.get_tun_dict()
- self.fan_list = self.get_fans()
- self.window_list = self.get_windows()
- self.gate_list = self.get_gates()
- # self.wind_flow_list = self.get_wind_flow()
- self.wind_bridge_list = self.get_wind_bridge()
- self.tun_dict = self.get_tun_dict()
- self.air_flow_list = self.get_air_flow()
- def get_layerMap(self):
- layer_list = self.json["layerMap"]
- layer_map = {}
- for layer in layer_list:
- layer_map[layer[0]] = layer[1]
- return layer_map
- def get_tun_dict(self):
- node_list = self.json["tunsMap"]
- tun_dict = {}
- for node in tqdm(node_list, desc=' 【巷道读取中】'):
- node = node[1]
- if check_node(node) is False: continue
- tun = {
- "tun_id": node["ntunid"], "layer_id": node["nlayerid"], "width": node["fwidth"],
- "type": node["tunType"],
- "angle": node["angleRad"],
- "air_type": node["nairtype"],
- "from": {"agg": layer_aggregation(global_process((node["nfrom"]["x"], node["nfrom"]["z"])), self.model),
- "div": layer_divide(global_process((node["nfrom"]["x"], node["nfrom"]["z"])),
- self.layer_map[node["nlayerid"]])
- },
- "to": {"agg": layer_aggregation(global_process((node["nto"]["x"], node["nto"]["z"])), self.model),
- "div": layer_divide(global_process((node["nto"]["x"], node["nto"]["z"])),
- self.layer_map[node["nlayerid"]])},
- "name": node["strname"],
- "vec12": {"agg": [], "div": []},
- "vec34": {"agg": [], "div": []}
- }
- for n in node["vec12"]:
- agg_point = layer_aggregation(global_process((n["x"], n["z"])), self.model)
- tun["vec12"]["agg"].append(agg_point)
- for n in node["layerVec12"]:
- div_point = layer_divide(global_process((n["x"], n["z"])), self.layer_map[node["nlayerid"]])
- tun["vec12"]["div"].append(div_point)
- for n in node["vec34"]:
- agg_point = layer_aggregation(global_process((n["x"], n["z"])), self.model)
- tun["vec34"]["agg"].append(agg_point)
- for n in node["layerVec34"]:
- div_point = layer_divide(global_process((n["x"], n["z"])), self.layer_map[node["nlayerid"]])
- tun["vec34"]["div"].append(div_point)
- if tun["type"] != "1" and len(tun["vec12"]) > 1 and len(tun["vec34"]) > 1:
- seg1 = tun["vec12"]['div'][0], tun["vec12"]['div'][-1]
- seg2 = tun["vec34"]['div'][0], tun["vec34"]['div'][-1]
- tun["width"] = core.min_distance_between_segments(seg1, seg2)
- tun["route"] = {
- "agg": core.calculate_angle_with_x_axis(tun['from']['agg'],
- tun['to']['agg']),
- "div": core.calculate_angle_with_x_axis(tun['from']['div'],
- tun['to']['div'])
- }
- else:
- tun["route"] = {
- "agg": 0,
- "div": 0
- }
- tun["width"] = 3.1415936
- tun_dict[tun["tun_id"]] = tun
- return tun_dict
- def get_tuns(self):
- node_list = self.json["tunsMap"]
- tun_list = []
- for node in tqdm(node_list, desc=' 【巷道读取中】'):
- node = node[1]
- if check_node(node) is False: continue
- # node['vec12'] = node['layerVec12']
- # node['vec34'] = node['layerVec34']
- tun = {
- "tun_id": node["ntunid"],
- "layer_id": node["nlayerid"],
- "width": node["fwidth"],
- "type": node["tunType"],
- "air_type": node["nairtype"],
- "angle": node["angleRad"],
- "from": {"agg": layer_aggregation(global_process((node["nfrom"]["x"], node["nfrom"]["z"])), self.model),
- "div": layer_divide(global_process((node["nfrom"]["x"], node["nfrom"]["z"])),
- self.layer_map[node["nlayerid"]])
- },
- "to": {"agg": layer_aggregation(global_process((node["nto"]["x"], node["nto"]["z"])), self.model),
- "div": layer_divide(global_process((node["nto"]["x"], node["nto"]["z"])),
- self.layer_map[node["nlayerid"]])},
- "name": node["strname"],
- "vec12": {"agg": [], "div": []},
- "vec34": {"agg": [], "div": []}
- }
- for n in node["vec12"]:
- agg_point = layer_aggregation(global_process((n["x"], n["z"])), self.model)
- tun["vec12"]["agg"].append(agg_point)
- for n in node["layerVec12"]:
- div_point = layer_divide(global_process((n["x"], n["z"])), self.layer_map[node["nlayerid"]])
- tun["vec12"]["div"].append(div_point)
- for n in node["vec34"]:
- agg_point = layer_aggregation(global_process((n["x"], n["z"])), self.model)
- tun["vec34"]["agg"].append(agg_point)
- for n in node["layerVec34"]:
- div_point = layer_divide(global_process((n["x"], n["z"])), self.layer_map[node["nlayerid"]])
- tun["vec34"]["div"].append(div_point)
- if tun["type"] != "1" and len(tun["vec12"]) > 1 and len(tun["vec34"]) > 1:
- seg1 = tun["vec12"]['div'][0], tun["vec12"]['div'][-1]
- seg2 = tun["vec34"]['div'][0], tun["vec34"]['div'][-1]
- tun["width"] = core.min_distance_between_segments(seg1, seg2)
- tun["route"] = {
- "agg": core.calculate_angle_with_x_axis(tun['from']['agg'],
- tun['to']['agg']),
- "div": core.calculate_angle_with_x_axis(tun['from']['div'],
- tun['to']['div'])
- }
- else:
- tun["route"] = {
- "agg": 0,
- "div": 0
- }
- tun["width"] = 3.1415936
- print(tun['air_type'])
- tun_list.append(tun)
- return tun_list
- def get_windows(self):
- layer_map = self.json["layerMap"]
- window_list = []
- for layer in tqdm(layer_map, desc=' 【风窗读取中】'):
- windows = layer[1]["windows"]
- for w in windows:
- tun = self.tun_dict.get(w["ntunid"])
- if tun is None: continue
- # tun = [tun_ for tun_ in self.tun_list if tun_.get("tun_id") == w["ntunid"]][0]
- window = {"layer": w["nlayerid"], "color": core.get_color_by_layer(w["nlayerid"]),
- "name": w["strname"],
- "id": w["id"],
- "center": {
- "agg": layer_aggregation(global_process((w['x'], w['z'])), self.model),
- "div": layer_divide(global_process((w['x'], w['z'])),
- self.layer_map[w["nlayerid"]])
- },
- 'route': {"agg": tun["route"]['agg'],
- "div": tun["route"]['div']
- },
- 'width': tun['width']
- }
- window_list.append(window)
- return window_list
- def get_fans(self):
- layer_map = self.json["layerMap"]
- fans_list = []
- for layer in tqdm(layer_map, desc=' 【风扇读取中】'):
- fans = layer[1]["fans"]
- for f in fans:
- tun = self.tun_dict.get(f["ntunid"])
- if tun is None: continue
- fan = {"layer": f["nlayerid"], "color": core.get_color_by_layer(f["nlayerid"]),
- "type": f["strtype"],
- "name": f["strname"],
- "id": f["id"],
- "center": {
- "agg": layer_aggregation(global_process((f['x'], f['z'])), self.model),
- "div": layer_divide(global_process((f['x'], f['z'])), self.layer_map[f["nlayerid"]])
- },
- 'route': {"agg": tun["route"]['agg'],
- "div": tun["route"]['div']
- },
- 'width': tun['width']
- }
- fans_list.append(fan)
- return fans_list
- def get_gates(self):
- layer_map = self.json["layerMap"]
- gate_list = []
- for layer in tqdm(layer_map, desc=' 【风门读取中】'):
- gates = layer[1]["gates"]
- for g in gates:
- tun = self.tun_dict.get(g["ntunid"])
- if tun is None: continue
- gate = {"layer": g["nlayerid"], "color": core.get_color_by_layer(g["nlayerid"]),
- "type": g["strtype"],
- "name": g["strname"],
- "id": g["id"],
- "center": {
- "agg": layer_aggregation(global_process((g['x'], g['z'])), self.model),
- "div": layer_divide(global_process((g['x'], g['z'])), self.layer_map[g["nlayerid"]])
- },
- 'route': {"agg": tun["route"]['agg'],
- "div": tun["route"]['div']
- },
- 'width': tun['width']
- }
- gate_list.append(gate)
- return gate_list
- def calculate_route_middle(self, path, layer_id):
- path_start = global_process((path[0]['x'], path[0]['z']))
- path_end = global_process((path[-1]['x'], path[-1]['z']))
- path_start_agg = layer_aggregation(path_start, self.model)
- path_end_agg = layer_aggregation(path_end, self.model)
- path_start_div = layer_divide(path_start, self.layer_map[layer_id])
- path_end_div = layer_divide(path_end, self.layer_map[layer_id])
- path_middle_agg = core.find_point_on_line(path_start_agg, path_end_agg, 1 / 2)
- path_middle_div = core.find_point_on_line(path_start_div, path_end_div, 1 / 2)
- route_agg = core.calculate_angle_with_x_axis(path_start_agg, path_end_agg)
- route_div = core.calculate_angle_with_x_axis(path_start_div, path_end_div)
- return {"middle": {"agg": path_middle_agg, "div": path_middle_div},
- "route": {"agg": route_agg, "div": route_div}}
- def calculate_segment_route_middle(self, vector, layer_id):
- path_start = global_process(vector[0])
- path_end = global_process(vector[1])
- path_start_agg = layer_aggregation(path_start, self.model)
- path_end_agg = layer_aggregation(path_end, self.model)
- path_start_div = layer_divide(path_start, self.layer_map[layer_id])
- path_end_div = layer_divide(path_end, self.layer_map[layer_id])
- path_middle_agg = core.find_point_on_line(path_start_agg, path_end_agg, 1 / 2)
- path_middle_div = core.find_point_on_line(path_start_div, path_end_div, 1 / 2)
- route_agg = core.calculate_angle_with_x_axis(path_start_agg, path_end_agg)
- route_div = core.calculate_angle_with_x_axis(path_start_div, path_end_div)
- return {"middle": {"agg": path_middle_agg, "div": path_middle_div},
- "route": {"agg": route_agg, "div": route_div}}
- def get_air_flow(self):
- air_flow_list = []
- for tun in tqdm(self.tun_dict.values(), desc='【风流方向读取中】'):
- vec_agg = tun['from']['agg'], tun['to']['agg']
- vec_div = tun["from"]["div"], tun["to"]["div"]
- distance = core.distance(vec_agg[0], vec_agg[1])
- if 50 < distance < 100:
- middle_agg = core.find_point_on_line(vec_agg[0], vec_agg[1], 1 / 2)
- middle_div = core.find_point_on_line(vec_div[0], vec_div[1], 1 / 2)
- air_flow = {
- "layer": tun['layer_id'],
- "center": {
- "agg": middle_agg,
- "div": middle_div
- },
- "route": {"agg": tun["route"]['agg'],
- "div": tun['route']['div']},
- "type": tun['air_type']
- }
- air_flow_list.append(air_flow)
- else:
- space = int(distance // 100)
- agg_points = core.divide_segment(vec_agg[0], vec_agg[1], space)[1:-1]
- div_points = core.divide_segment(vec_div[0], vec_div[1], space)[1:-1]
- for i in range(len(agg_points)):
- air_flow = {
- "layer": tun['layer_id'],
- "center": {
- "agg": agg_points[i],
- "div": div_points[i]
- },
- "route": {"agg": tun["route"]['agg'],
- "div": tun['route']['div']},
- "type": tun['air_type']
- }
- air_flow_list.append(air_flow)
- return air_flow_list
- # def get_wind_flow(self):
- # path_point_map = self.json["pathLineMap"]
- # wind_flow_list = {
- # "in": [],
- # "no": [],
- # "out": [],
- # "use": []
- # }
- # for path_point in tqdm(path_point_map, desc=' 【风流方向读取中】'):
- # in_paths = path_point[1]['inPaths']
- # for path in in_paths:
- # middle_route = self.calculate_route_middle(path, path_point[0])
- # in_path = {
- # "layer": path_point[0],
- # "center": {
- # "agg": middle_route["middle"]["agg"],
- # "div": middle_route["middle"]["div"]
- # },
- # "route": {"agg": middle_route["route"]["agg"],
- # "div": middle_route["route"]["div"]},
- # "color": "3",
- # "type": "3"
- # }
- # wind_flow_list["in"].append(in_path)
- #
- # no_paths = path_point[1]['noPaths']
- # for path in no_paths:
- # middle_route = self.calculate_route_middle(path, path_point[0])
- # no_path = {
- # "layer": path_point[0],
- # "center": {
- # "agg": middle_route["middle"]["agg"],
- # "div": middle_route["middle"]["div"]
- # },
- # "route": {"agg": middle_route["route"]["agg"],
- # "div": middle_route["route"]["div"]},
- # "color": "3",
- # "type": "3"
- # }
- # wind_flow_list["no"].append(no_path)
- #
- # out_paths = path_point[1]['outPaths']
- # for path in out_paths:
- # middle_route = self.calculate_route_middle(path, path_point[0])
- # out_path = {
- # "layer": path_point[0],
- # "center": {
- # "agg": middle_route["middle"]["agg"],
- # "div": middle_route["middle"]["div"]
- # },
- # "route": {"agg": middle_route["route"]["agg"],
- # "div": middle_route["route"]["div"]},
- # "color": "1",
- # "type": "1"
- # }
- # wind_flow_list["out"].append(out_path)
- # use_paths = path_point[1]['usePaths']
- # for path in use_paths:
- # middle_route = self.calculate_route_middle(path, path_point[0])
- # use_path = {
- # "layer": path_point[0],
- # "center": {
- # "agg": middle_route["middle"]["agg"],
- # "div": middle_route["middle"]["div"]
- # },
- # "route": {"agg": middle_route["route"]["agg"],
- # "div": middle_route["route"]["div"]},
- # "color": "1",
- # "type": "1"
- # }
- # wind_flow_list["use"].append(use_path)
- #
- # return wind_flow_list
- def get_wind_bridge(self):
- layer_map = self.json["layerMap"]
- wind_bridge_list = []
- for layer in tqdm(layer_map, desc=' 【风桥读取中】'):
- cross_nodes = layer[1]['crossNodes']
- for cross_node in cross_nodes:
- center = cross_node['crossPoint']['x'], -cross_node['crossPoint']['y']
- # tun = [tun_ for tun_ in self.tun_list if tun_.get("tun_id") == cross_node['tun1Id']][0]
- tun = self.tun_dict.get(cross_node["tun1Id"])
- if tun is None: continue
- wind_bridge = {
- 'layer': layer[0],
- 'color': core.get_color_by_layer(layer[0]),
- "center": {
- "agg": layer_aggregation(global_process(center), self.model),
- "div": layer_divide(global_process(center), self.layer_map[layer[0]])
- },
- 'route': {"agg": tun["route"]['agg'],
- "div": tun['route']['div']},
- 'width': tun['width'] * 1.5114514
- }
- wind_bridge_list.append(wind_bridge)
- return wind_bridge_list
|