Commit 7dff71d4 authored by 张晓彤's avatar 张晓彤

添加实时调度, 添加代码注释

parent 88ffe739
......@@ -46,116 +46,6 @@ class CurrentTruck:
return self._group_id
class RuleHandle:
""" class for the schedule rule handler.
Description:
基于调度规则对调度价值进行修正
Attribute:
equipment class: truck, excavator, dump, group
"""
def __init__(self, dump, excavator, truck, group):
self._dump = dump
self._excavator = excavator
self._truck = truck
self._group = group
self._priority_control = PriorityController()
self._schedule_alg = ScheduleAlg(dump, excavator, truck, group)
def filter(self, current_truck):
"""
计算调度规则综合影响调度价值结果
:param truck_id: 矿卡uuid
:return:
"""
# 矿卡uuid
truck_id = current_truck.get_truck_id()
# 矿卡序号
truck_index = self.truck.truck_uuid_to_index_dict[truck_id]
# 矿卡行程
trip = current_truck.get_trip()
# 矿卡任务
task = current_truck.get_task()
# 1. 读取路网权重和可通行性
walk_to_excavator_weight, walk_to_excavator_weight, park_walk_weight \
= self._priority_control.weighted_walk_calc()
walk_available = self._priority_control.walk_available_calc()
# 2. 读取挖机禁止关系
# excavator_exclude_truck_factor = self._truck.update_truck_excavator_exclude()
#
# excavator_exclude_truck_factor[np.where(excavator_exclude_truck_factor > M / 2)] = 1
#
# excavator_exclude_truck_factor = 1 - excavator_exclude_truck_factor
excavator_exclude_modify = self._group.group_excavator_exclude_modify[truck_id]
# 3. 读取调度价值
transport_value = self._schedule_alg.truck_schedule(current_truck, "congestion")
transport_value_norm = np.max(transport_value) / transport_value
transport_value_norm = transport_value_norm * walk_available
# def route_modify_factor(self, truck_id):
# """
# 规则及权重因子针对某个矿卡的待的各路线的影响
# :param truck_id:
# :return:
# """
# walk_to_excavator_weight, walk_to_excavator_weight, park_walk_weight \
# = self._priority_control.weighted_walk_calc()
# walk_available = self._priority_control.walk_available_calc()
def truck_walk_weight(self, current_truck):
"""
道路权重性针对某个矿卡到的各目的地的影响
:param current_truck:
:return:
"""
walk_to_excavator_weight, walk_to_dump_weight, park_walk_weight \
= self._priority_control.weighted_walk_calc()
if current_truck.get_task() == 2:
return park_walk_weight
elif current_truck.get_task() in [0, 1, 2]:
end_excavator = int(current_truck.get_trip[1])
return walk_to_dump_weight[end_excavator, :]
elif current_truck.get_trip() in [3, 4, 5]:
end_dump = int(current_truck.get_trip[1])
return walk_to_excavator_weight[end_dump, :]
def truck_walk_available(self, current_truck):
"""
道路通行性针对某个矿卡到的各目的地的影响
:param current_truck: 当前请调矿卡
:return: 矿卡出发地到各目的地道路可通行性
"""
walk_available = self._priority_control.walk_available_calc()
if current_truck.get_task() == 2:
return np.ones(get_value("excavator_num"))
elif current_truck.get_task() in [0, 1, 2]:
end_excavator = int(current_truck.get_trip[1])
return walk_available[:, end_excavator].reshape(1, -1)
elif current_truck.get_trip() in [3, 4, 5]:
end_dump = int(current_truck.get_trip[1])
return walk_available[end_dump, :].reshape(1, -1)
class Dispatcher(WalkManage):
""" class for the truck dispatch.
......@@ -170,14 +60,14 @@ class Dispatcher(WalkManage):
"""
def __init__(self, dump, excavator, truck, predict_schedule):
def __init__(self, dump, excavator, truck, predict_schedule, request_mode=False):
# 运行模式
self.request_mode = request_mode
# 调度开始时间
self.start_time = datetime.now()
# 路径规划对象
self.path = PathPlanner()
# 车流对象
# self.traffic_flow = Traffic_flow(dump, excavator, truck)
self.traffic_flow = None
......@@ -193,6 +83,12 @@ class Dispatcher(WalkManage):
self.excavator = excavator
self.truck = truck
# # 路径规划对象
# self.path = PathPlanner(dump, excavator, dump)
# 调度结果输出器
self.submission = DispatchSubmission(dump, excavator, truck, self.group)
# 调度算法
self._schedule_alg = ScheduleAlg(dump, excavator, truck, self.group, self.pre_sch)
......@@ -200,8 +96,11 @@ class Dispatcher(WalkManage):
self.logger = get_logger("zxt.dispatcher")
def dispatcher_period_update(self):
global_period_para_update()
"""
控制全局调度信息更新
"""
if not self.request_mode:
global_period_para_update()
# 更新卸载设备对象
self.dump.dump_para_period_update()
......@@ -217,13 +116,19 @@ class Dispatcher(WalkManage):
# # 更新实时车流
# self.traffic_flow.update_actual_traffic_flow()
# 获取路网加权行驶成本
self.cost_to_excavator, self.cost_to_dump, self.cost_park_to_excavator = self.path.walk_cost_cal()
# # 获取路网加权行驶成本
# self.cost_to_excavator, self.cost_to_dump, self.cost_park_to_excavator = self.path.walk_cost_cal()
# 调度分组更新
self.group.period_update()
def truck_schedule(self, truck_id):
"""
按照指定规则及算法, 为请调车辆分配派车计划
:param truck_id: (uuid) 请调车辆id
:return:
target: (int) 请调车辆目的地id
"""
# 规则读取
rule3 = session_mysql.query(DispatchRule).filter_by(id=3).first().disabled
......@@ -383,7 +288,7 @@ class Dispatcher(WalkManage):
excavator_uuid_to_name_dict = get_value("excavator_uuid_to_name_dict")
self.logger.info(f"目的地:{excavator_uuid_to_name_dict[self.excavator.excavator_index_to_uuid_dict[target]]}")
if task in [0, 1]: # 矿卡空载行驶或正在入场
if task in [0, 1]: # 矿卡空载行驶或正在入场
################################################ 矿卡空载 ###############################################
try:
......@@ -400,7 +305,7 @@ class Dispatcher(WalkManage):
self.logger.info(f'空载trip {trip}')
if truck_id in self.truck.truck_material_bind:
self.logger.info(f'物料类型 {self.truck.truck_material_bind[truck_id]}')
self.logger.info(f'驶往卸点的运输成本 {self.cost_to_dump}')
# self.logger.info(f'驶往卸点的运输成本 {self.cost_to_dump}')
self.logger.info("卸点物料修正")
self.logger.info(self.truck.dump_material_bind_modify)
......@@ -408,7 +313,6 @@ class Dispatcher(WalkManage):
self.logger.info("矿卡行程信息异常")
self.logger.info(es)
try:
# 1. 绑定调度
......@@ -460,7 +364,7 @@ class Dispatcher(WalkManage):
dump_uuid_to_name_dict = get_value("dump_uuid_to_name_dict")
self.logger.info(f"目的地:{dump_uuid_to_name_dict[self.dump.dump_index_to_uuid_dict[target]]}")
elif task in [3, 4]: # 卡车重载行驶或正在入场
elif task in [3, 4]: # 卡车重载行驶或正在入场
################################################ 矿卡重载 ###############################################
try:
......@@ -480,8 +384,8 @@ class Dispatcher(WalkManage):
self.logger.info("物料类型")
if truck_id in self.truck.truck_material_bind:
self.logger.info(self.truck.truck_material_bind[truck_id])
self.logger.info("驶往挖机的运输成本")
self.logger.info(self.cost_to_excavator)
# self.logger.info("驶往挖机的运输成本")
# self.logger.info(self.cost_to_excavator)
self.logger.info("挖机物料修正")
self.logger.info(self.truck.excavator_material_bind_modify)
self.logger.info("挖机优先级修正")
......@@ -496,8 +400,8 @@ class Dispatcher(WalkManage):
if truck_id in self.truck.truck_excavator_bind:
target = self.excavator.excavator_uuid_to_index_dict[self.truck.truck_excavator_bind[truck_id]]
self.logger.info("矿卡已绑定挖机")
self.logger.info("cost_to_excavator")
self.logger.info(self.cost_to_excavator)
# self.logger.info("cost_to_excavator")
# self.logger.info(self.cost_to_excavator)
# 2 正常调度
elif rule3 and rule4:
......@@ -545,9 +449,14 @@ class Dispatcher(WalkManage):
# except Exception as es:
# self.logger.error("truck_schedule,error")
# self.logger.error(es)
return target
def schedule_construct(self):
"""
读取调度所需信息, 依次为动态派车车辆请求派车计划, 并写入redis缓存
:return: None
"""
global truck
global excavator
......@@ -562,9 +471,6 @@ class Dispatcher(WalkManage):
truck_current_trip = self.truck.get_truck_current_trip()
truck_current_task = self.truck.get_truck_current_task()
# Seq初始化
Seq = [[truck_current_trip[i][1], -1] for i in range(dynamic_truck_num)]
# 获取矿卡最早可用时间
truck_avl_time = self.pre_sch.get_truck_avl_time()
......@@ -586,64 +492,54 @@ class Dispatcher(WalkManage):
# 对于在线矿卡已经赋予新的派车计划,更新其最早可用时间,及相关设备时间参数
for truck_index in index:
if len(Seq[truck_index]) > 0:
# try:
# 获取矿卡id
try:
truck_id = self.truck.truck_index_to_uuid_dict[truck_index]
except Exception as es:
self.logger.error('error01')
self.logger.error(es)
try:
self.truck_request(self.truck.truck_index_to_uuid_dict[truck_index])
# 判断矿卡是否禁用
if truck_id in self.truck.update_truck_disable_list():
continue
try:
# 获取矿卡当前任务
task = truck_current_task[self.truck.truck_index_to_uuid_dict[truck_index]]
except Exception as es:
self.logger.error('error02')
self.logger.error(es)
except Exception as es:
self.logger.error("")
self.logger.error(es)
try:
# 矿卡结束当前派车计划后的目的地
end_eq_index = truck_current_trip[truck_index][1]
except Exception as es:
self.logger.error('error03')
self.logger.error(es)
# # submission将调度结果写入redis
# self.submission.submit_to_redis(Seq)
try:
# 调用调度函数,得到最优目的地序号
target_eq_index = self.truck_schedule(self.truck.truck_index_to_uuid_dict[truck_index])
def truck_request(self, truck_id):
except Exception as es:
self.logger.error('调度算法计算异常')
self.logger.error(es)
self.logger.error("truck_index,uuid")
self.logger.error(truck_index)
self.logger.error(self.truck.truck_index_to_uuid_dict[truck_index])
truck_index = self.truck.truck_uuid_to_index_dict[truck_id]
self.logger.info("target_eq_index")
self.logger.info(target_eq_index)
try:
# 写入Seq序列
Seq[truck_index][1] = target_eq_index
except Exception as es:
self.logger.error('error05')
self.logger.error(es)
self.logger.error("target_eq_index")
self.logger.error(target_eq_index)
self.logger.error("target_eq_index,type")
self.logger.error(type(target_eq_index))
# except Exception as es:
# self.logger.error("truck,task,end_eq_index,error")
# self.logger.error(es)
try:
group_id = self.group.dispatch_truck_group[truck_id]
except Exception as es:
self.logger.error("非动态调度矿卡")
self.logger.error(es)
truck_current_trip = self.truck.get_truck_current_trip()
truck_dispatch_seq = [truck_current_trip[truck_index][1], -1]
target_eq_index = self.truck_schedule(self.truck.truck_index_to_uuid_dict[truck_index])
truck_dispatch_seq[1] = target_eq_index
self.submission.truck_dispatch_to_redis(truck_id, truck_dispatch_seq)
class DispatchSubmission:
""" class for the submission calculated dispatch.
Description:
将调度结果按照指定格式传递到云端机群
Attribute:
"""
def __init__(self, dump, excavator, truck, group):
self.logger = self.logger = get_logger("zxt.submission")
self.dump = dump
self.excavator = excavator
self.truck = truck
self.group = group
def submit_to_redis(self, Seq):
"""
将调度结果输出到redis
:param Seq: (List[int]) 调度结果列表
:return: None
"""
for i in range(len(Seq)):
try:
......@@ -657,7 +553,7 @@ class Dispatcher(WalkManage):
self.logger.error("调度结果写入异常-读取矿卡信息异常(uuid, group id, task)")
self.logger.error(es)
if task in [0, 1, 2]: # 卡车空载或在装载区出场前, 可变更卸载目的地
if task in [0, 1, 2]: # 卡车空载或在装载区出场前, 可变更卸载目的地
try:
item = (
session_mysql.query(Dispatch)
......@@ -690,7 +586,7 @@ class Dispatcher(WalkManage):
redis5.set(self.truck.truck_index_to_uuid_dict[i], str(json.dumps(record)))
except Exception as es:
self.logger.error("调度结果写入异常-矿卡空载")
elif task in [3, 4, 5]: # 卡车重载或在卸载区出场前, 可变更装载目的地
elif task in [3, 4, 5]: # 卡车重载或在卸载区出场前, 可变更装载目的地
try:
item = (
session_mysql.query(Dispatch)
......@@ -784,14 +680,148 @@ class Dispatcher(WalkManage):
# self.logger.error("更新不及时-1")
# self.logger.error(es)
return Seq
def truck_dispatch_to_redis(self, truck_id, dispatch_seq):
"""
将truck_id对应矿卡派车计划写入redis
:param truck_id: (uuid) 矿卡uuid
:param dispatch_seq: (List[int]) 矿卡派车计划
:return: None
"""
try:
try:
group_id = self.group.dispatch_truck_group[truck_id]
record = {"truckId": truck_id}
task = self.truck.get_truck_current_task()[truck_id]
except Exception as es:
self.logger.error("调度结果写入异常-读取矿卡信息异常(uuid, group id, task)")
self.logger.error(es)
if task in [0, 1, 2]: # 卡车空载或在装载区出场前, 可变更卸载目的地
try:
item = (
session_mysql.query(Dispatch)
.filter_by(dump_id=DeviceMap.dump_index_to_uuid_dict[dispatch_seq[1]],
exactor_id=DeviceMap.excavator_index_to_uuid_dict[dispatch_seq[0]],
truck_id=truck_id,
group_id=group_id,
isauto=1, isdeleted=0, ).first())
if item is None:
raise Exception("调度计划表与实时监控不匹配")
except Exception as es:
self.logger.error(es)
item = (
session_mysql.query(Dispatch)
.filter_by(truck_id=truck_id,
# group_id=group_id,
isauto=1, isdeleted=0, ).first())
try:
record["exactorId"] = item.exactor_id
record["dumpId"] = item.dump_id
record["loadAreaId"] = item.load_area_id
record["unloadAreaId"] = item.unload_area_id
record["dispatchId"] = item.id
record["isdeleted"] = False
record["creator"] = item.creator
record["createtime"] = item.createtime.strftime(
"%b %d, %Y %I:%M:%S %p")
redis5.set(truck_id, str(json.dumps(record)))
except Exception as es:
self.logger.error("调度结果写入异常-矿卡空载")
elif task in [3, 4, 5]: # 卡车重载或在卸载区出场前, 可变更装载目的地
try:
item = (
session_mysql.query(Dispatch)
.filter_by(exactor_id=DeviceMap.excavator_index_to_uuid_dict[dispatch_seq[1]],
dump_id=DeviceMap.dump_index_to_uuid_dict[dispatch_seq[0]],
truck_id=truck_id,
group_id=group_id,
isauto=1, isdeleted=0, ).first())
if item is None:
raise Exception("调度计划表与实时监控不匹配")
except Exception as es:
self.logger.error(es)
item = (
session_mysql.query(Dispatch)
.filter_by(truck_id=truck_id,
# group_id=group_id,
isauto=1, isdeleted=0, ).first())
try:
record["exactorId"] = self.excavator.excavator_index_to_uuid_dict[dispatch_seq[1]]
record["dumpId"] = item.dump_id
record["loadAreaId"] = item.load_area_id
record["unloadAreaId"] = item.unload_area_id
record["dispatchId"] = item.id
record["isdeleted"] = False
record["creator"] = item.creator
record["createtime"] = item.createtime.strftime(
"%b %d, %Y %I:%M:%S %p")
redis5.set(truck_id, str(json.dumps(record)))
except Exception as es:
self.logger.error("调度结果写入异常-矿卡重载")
elif task == -2:
try:
try:
item = (
session_mysql.query(Dispatch)
.filter_by(exactor_id=DeviceMap.excavator_index_to_uuid_dict[dispatch_seq[1]],
truck_id=truck_id,
group_id=group_id,
isauto=1, isdeleted=0).first())
if item is None:
raise Exception("调度计划表与实时监控不匹配")
self.logger.info(dispatch_seq)
self.logger.info(dispatch_seq[1])
self.logger.info(DeviceMap.excavator_index_to_uuid_dict[dispatch_seq[1]])
self.logger.info("item")
print(item.id, item.truck_id, item.exactor_id, item.dump_id)
except Exception as es:
self.logger.error(es)
item = (
session_mysql.query(Dispatch)
.filter_by(truck_id=truck_id,
# group_id=group_id,
isauto=1, isdeleted=0).first())
try:
record["exactorId"] = item.exactor_id
record["dumpId"] = item.dump_id
record["loadAreaId"] = item.load_area_id
record["unloadAreaId"] = item.unload_area_id
record["dispatchId"] = item.id
record["isdeleted"] = False
record["creator"] = item.creator
record["createtime"] = item.createtime.strftime(
"%b %d, %Y %I:%M:%S %p")
redis5.set(truck_id, str(json.dumps(record)))
except Exception as es:
self.logger.error("调度结果写入异常-矿卡故障或备停区-redis写入异常")
self.logger.error(es)
except Exception as es:
self.logger.error("调度结果写入异常-矿卡故障或备停区")
self.logger.error(es)
else:
pass
except Exception as es:
self.logger.error("调度结果写入异常")
self.logger.error(f"调度结果:{dispatch_seq}")
self.logger.error(es)
class PreSchedule:
""" class for the prediction of equipments' trip.
Description:
Calculate and update the prediction item.
负责处理所有预测项的计算与更新
基于矿卡最近一次装卸载时间预测其抵达目的地时间
根据矿卡请求队列及抵达信息,计算设备最早可用时间
......
......@@ -11,7 +11,18 @@ from para_config import *
from path_plan.path_plannner import PathPlanner
from traffic_flow.traffic_flow_planner import traffic_flow_plan
class Group(WalkManage):
""" class of truck group dispatch processing.
Description:
管理车辆调度分组,计算分组调度所需信息
Attribute:
equipment class: truck, excavator, dump
group info: group num, group set, group walk cost ...
path planner class
"""
def __init__(self, dump, excavator, truck, traffic_flow):
self.dump = dump
self.excavator = excavator
......@@ -19,6 +30,7 @@ class Group(WalkManage):
self.traffic_flow = traffic_flow
self.dispatch_truck_group = {}
self.group_dispatch_truck = {}
self.group_num = 1
self.group_set = set()
self.device_group = {}
......@@ -40,14 +52,22 @@ class Group(WalkManage):
self.group_excavator_material_bind_modify = {}
self.group_dump_material_bind_modify = {}
self.path = PathPlanner()
self.path = PathPlanner(dump, excavator, truck)
self.logger = get_logger("zxt.group_control")
def update_dispatch_truck_group(self):
"""
更新车辆所属分组 -> (dispatch_truck_group)
"""
# 更新矿卡-调度分组隶属关系
self.dispatch_truck_group = {}
self.group_dispatch_truck = {}
dynamic_truck_set = get_value("dynamic_truck_set")
print("dispatch_truck_group-dynamic_truck_set")
print(dynamic_truck_set)
self.logger.info("dispatch_truck_group-dynamic_truck_set")
self.logger.info(dynamic_truck_set)
# 动态派车数量没变,但是此时某条派车计划被删除,dispatch_truck_group 就会缺失矿卡
for truck_id in dynamic_truck_set:
item = session_mysql.query(Dispatch).filter_by(truck_id=truck_id, isauto=1, isdeleted=0).first()
......@@ -55,10 +75,19 @@ class Group(WalkManage):
print(truck_id)
continue
self.dispatch_truck_group[truck_id] = item.group_id
if item.group_id not in self.group_dispatch_truck:
self.group_dispatch_truck[item.group_id] = [truck_id]
else:
self.group_dispatch_truck[item.group_id].append(truck_id)
print(self.dispatch_truck_group)
self.logger.info("truck_id <-> group_id")
self.logger.info(self.dispatch_truck_group)
self.logger.info(self.group_dispatch_truck)
def update_group_set(self):
"""
更新分组集合 -> (group_set, group_num)
"""
# 更新调度组
self.group_set = set()
for item in session_mysql.query(Dispatch).filter_by(isauto=1, isdeleted=0).all():
......@@ -67,6 +96,9 @@ class Group(WalkManage):
self.group_num = len(self.group_set)
def update_device_group(self):
"""
更新分组所包含挖机及卸点 -> (device_group)
"""
# 更新设备分组group_id -> {set(dump_id), set(excavator_id)}
self.device_group = {}
for group_id in self.get_group_set():
......@@ -78,8 +110,10 @@ class Group(WalkManage):
self.device_group[group_id][0].add(item.dump_id)
self.device_group[group_id][1].add(item.exactor_id)
# 更新实际交通流
def update_actual_traffic_flow(self):
"""
更新实时交通流 -> (actual_traffic_flow)
"""
loading_task_time = self.excavator.get_loading_task_time()
......@@ -105,14 +139,14 @@ class Group(WalkManage):
)
# try:
logger.info("dynamic_truck_num")
logger.info(dynamic_truck_num)
self.logger.info("dynamic_truck_num")
self.logger.info(dynamic_truck_num)
print("truck.truck_index_to_uuid_dict")
print(self.truck.truck_index_to_uuid_dict)
self.logger.info("truck.truck_index_to_uuid_dict")
self.logger.info(self.truck.truck_index_to_uuid_dict)
print("truck_current_task")
print(truck_current_task)
self.logger.info("truck_current_task")
self.logger.info(truck_current_task)
for i in range(dynamic_truck_num):
task = truck_current_task[DeviceMap.truck_index_to_uuid_dict[i]]
......@@ -130,27 +164,21 @@ class Group(WalkManage):
self.actual_goto_dump_traffic_flow = self.actual_goto_dump_traffic_flow / (
self.distance_to_dump.reshape(dynamic_excavator_num, dynamic_dump_num)
/ (1000 * empty_speed)
+ np.expand_dims(unloading_task_time, axis=0).repeat(
dynamic_excavator_num, axis=0
)
)
+ np.expand_dims(unloading_task_time, axis=0).repeat( dynamic_excavator_num, axis=0))
self.actual_goto_excavator_traffic_flow = (
self.actual_goto_excavator_traffic_flow
/ (
self.distance_to_excavator.reshape(
dynamic_dump_num, dynamic_excavator_num
)
self.actual_goto_excavator_traffic_flow / (
self.distance_to_excavator.reshape(dynamic_dump_num, dynamic_excavator_num)
/ (1000 * heavy_speed)
+ np.expand_dims(loading_task_time, axis=0).repeat(
dynamic_dump_num, axis=0
)
+ np.expand_dims(loading_task_time, axis=0).repeat(dynamic_dump_num, axis=0)
)
)
def update_group_truck_flow(self):
# 更新调度分组内车实时/最佳车流
"""
更新调度分组内车实时/最佳车流 -> (group_opt_goto_traffic_flow, group_actual_goto_traffic_flow)
!!! 目前该部分未使用
"""
global dispatcher
......@@ -161,10 +189,6 @@ class Group(WalkManage):
try:
# print("uuid_to_index_dict")
# print(dump.dump_uuid_to_index_dict)
# print(excavator.excavator_uuid_to_index_dict)
for group_id in self.group_set:
dump_group = self.device_group[group_id][0] # group 类最后更新,读取派车计划及分组情况,和前面的uuid 可能不一致
excavator_group = self.device_group[group_id][1]
......@@ -195,16 +219,17 @@ class Group(WalkManage):
self.group_actual_goto_dump_traffic_flow[group_id] = local_actual_goto_dump_traffic_flow
self.group_actual_goto_excavator_traffic_flow[group_id] = local_actual_goto_excavator_traffic_flow
except Exception as es:
logger.error(es)
logger.error("分组车流更新异常")
self.logger.error(es)
self.logger.error("分组车流更新异常")
logger.info("group_opt_traffic_flow")
logger.info(self.group_opt_goto_dump_traffic_flow)
logger.info(self.group_opt_goto_excavator_traffic_flow)
self.logger.info("group_opt_traffic_flow")
self.logger.info(self.group_opt_goto_dump_traffic_flow)
self.logger.info(self.group_opt_goto_excavator_traffic_flow)
def update_group_walk_cost(self):
# 更新调度分组路网行驶成本
"""
更新调度分组路网行驶成本 -> (group_walk_cost)
"""
walk_to_excavator_cost, walk_to_dump_cost, park_to_excavator_cost = self.path.walk_cost_cal()
......@@ -234,17 +259,17 @@ class Group(WalkManage):
local_park_to_excavator_cost[park_index][excavator_group_index] = \
park_to_excavator_cost[park_index][DeviceMap.excavator_uuid_to_index_dict[excavator_id]]
print("here1")
print(park_to_excavator_cost)
self.group_walk_to_excavator_cost[group_id] = local_walk_to_excavator_cost
self.group_walk_to_dump_cost[group_id] = local_walk_to_dump_cost
self.group_park_to_excavator[group_id] = local_park_to_excavator_cost
except Exception as es:
logger.info(es)
logger.info("error-11")
self.logger.info(es)
self.logger.info("error-11")
def update_group_device_map(self):
"""
更新调度分组内设备映射 -> (group_uuid_to_index_dict, group_index_to_uuid_dict)
"""
# 更新调度分组内设备映射
self.group_excavator_uuid_to_index_dict = {}
self.group_dump_uuid_to_index_dict = {}
......@@ -277,24 +302,23 @@ class Group(WalkManage):
dump_num = dump_num + 1
logger.info("group_map")
logger.info(self.group_dump_uuid_to_index_dict)
logger.info(self.group_excavator_uuid_to_index_dict)
self.logger.info("group_map")
self.logger.info(self.group_dump_uuid_to_index_dict)
self.logger.info(self.group_excavator_uuid_to_index_dict)
def update_modify(self):
"""
更新分组内车辆锁定-禁止-物料绑定修正 ->
(group_excavator_exclude_modify group_excavator_material_bind_modify group_dump_material_bind_modify)
"""
try:
dynamic_truck_set = get_value("dynamic_truck_set")
print("update_modify")
print(dynamic_truck_set)
print("self.dispatch_truck_group")
print(self.dispatch_truck_group)
self.group_excavator_exclude_modify = {}
self.group_excavator_material_bind_modify = {}
self.group_dump_material_bind_modify = {}
for truck_id in dynamic_truck_set:
group_id = self.dispatch_truck_group[truck_id]
group_dump_num = len(self.device_group[group_id][0])
......@@ -324,15 +348,16 @@ class Group(WalkManage):
self.group_excavator_material_bind_modify[truck_id] = excavator_material_bind_modify
self.group_dump_material_bind_modify[truck_id] = dump_material_bind_modify
except Exception as es:
logger.error(es)
logger.error("modify update 异常")
self.logger.error(es)
self.logger.error("modify update 异常")
def update_excavator_hold_truck(self, excavator_hold_truck):
'''
更新调度分组内挖机保有矿卡
:param excavator_hold_truck:
"""
更新调度分组内挖机hold矿卡
:param excavator_hold_truck: (List[int]) 挖机hold矿卡数量列表
:return:
'''
group_excavator_hold_truck: (Dict(group_id: List[int])) 分组内挖机hold矿卡数量列表
"""
group_excavator_hold_truck = {}
for group_id in self.group_set:
......@@ -346,6 +371,12 @@ class Group(WalkManage):
return group_excavator_hold_truck
def update_dump_hold_truck(self, dump_hold_truck):
"""
更新调度分组内卸点hold矿卡
:param dump_hold_truck: (List[int]) 卸点hold矿卡数量列表
:return:
group_dump_hold_truck: (Dict(group_id: List[int])) 分组内卸点hold矿卡数量列表
"""
group_dump_hold_truck = {}
for group_id in self.group_set:
......@@ -359,11 +390,12 @@ class Group(WalkManage):
return group_dump_hold_truck
def update_excavator_avl_time(self, excavator_avl_time):
'''
"""
更新调度分组内挖机可用时间
:param excavator_hold_truck:
:param excavator_avl_time: (List[int]) 挖机可用时间列表
:return:
'''
group_excavator_avl_time: (Dict(group_id: List[int])) 分组内挖机可用时间列表
"""
group_excavator_avl_time= {}
for group_id in self.group_set:
......@@ -377,12 +409,13 @@ class Group(WalkManage):
return group_excavator_avl_time
def update_dump_avl_time(self, dump_avl_time):
'''
更新调度分组内挖机可用时间
:param dump_hold_truck:
"""
更新调度分组内卸点可用时间
:param dump_avl_time: (List[int]) 卸点可用时间列表
:return:
'''
group_dump_avl_time= {}
group_dump_avl_time: (Dict(group_id: List[int])) 分组内卸点可用时间列表
"""
group_dump_avl_time = {}
for group_id in self.group_set:
dump_group = self.device_group[group_id][0]
......@@ -395,6 +428,11 @@ class Group(WalkManage):
return group_dump_avl_time
def update_allow_flow_to_excavator(self):
"""
更新分组内最大允许驶往挖机车流
:return:
group_allow_flow_to_excavator: (Dict(group_id: List[int])) 分组内最大允许驶往挖机车流
"""
group_allow_flow_to_excavator = {}
for group_id in self.group_set:
......@@ -408,6 +446,11 @@ class Group(WalkManage):
return group_allow_flow_to_excavator
def update_allow_flow_to_dump(self):
"""
更新分组内最大允许驶往卸点车流
:return:
group_allow_flow_to_dump (Dict(group_id: List[int])) 分组内最大允许驶往卸点车流
"""
group_allow_flow_to_dump = {}
for group_id in self.group_set:
......@@ -421,6 +464,9 @@ class Group(WalkManage):
return group_allow_flow_to_dump
def period_update(self):
"""
周期更新分组信息
"""
self.reset()
self.update_dispatch_truck_group()
self.update_group_set()
......
......@@ -16,15 +16,18 @@ M = 1000000
class PathPlanner(WalkManage):
def __init__(self):
def __init__(self, dump, excavator, truck):
# 路段类
self.lane = LaneInfo()
self.lane.lane_speed_generate()
# 设备类
self.dump = DumpInfo()
self.excavator = ExcavatorInfo()
self.truck = TruckInfo(self.dump, self.excavator)
self.truck.update_truck_size()
# self.dump = DumpInfo()
# self.excavator = ExcavatorInfo()
# self.truck = TruckInfo(self.dump, self.excavator)
self.dump = dump
self.excavator = excavator
self.truck = truck
# self.truck.update_truck_size()
# 控制类
self.controller = PriorityController(self.dump, self.excavator, self.truck)
......@@ -55,7 +58,7 @@ class PathPlanner(WalkManage):
self.cost_to_dump = np.zeros_like(get_value("distance_to_dump"))
self.cost_park_to_excavator = np.zeros_like(get_value("distance_park_to_excavator"))
def path_cost_generate(self, load_area_id, unload_area_id, is_park):
def path_cost_generate(self, load_area_id, unload_area_id, is_park, lane_cost_memory, alpha, beta):
# 卸载道路阻塞成本初始化
cost_to_unload_blockage = 0
......@@ -70,23 +73,9 @@ class PathPlanner(WalkManage):
# 修正因子
weight = 10
# 阻塞成本权重
alpha = 0
# 距离成本权重
beta = 1
session_mysql.commit()
# 距离成本启用
rule1 = session_mysql.query(DispatchRule).filter_by(id=1).first()
if rule1.disabled == 0:
beta = rule1.rule_weight
# 拥堵成本启用
rule2 = session_mysql.query(DispatchRule).filter_by(id=2).first()
if rule2.disabled == 0:
alpha = rule2.rule_weight
if alpha > 0:
beta /= (beta + 0.001)
alpha = alpha / beta * weight
......@@ -103,27 +92,44 @@ class PathPlanner(WalkManage):
# 读取道路路段信息
for lane_id in path.park_load_lanes:
# 各路段阻塞成本累加
cost_to_load_blockage = cost_to_load_blockage + beta * self.lane_cost_generate(lane_id)
if lane_id in self.lane.used_lane_set:
if lane_id in lane_cost_memory:
lane_cost = lane_cost_memory[lane_id]
else:
lane_cost = self.lane_cost_generate(lane_id)
lane_cost_memory[lane_id] = lane_cost
# 各路段阻塞成本累加
cost_to_load_blockage = cost_to_load_blockage + beta * lane_cost
# 道路总成本=道路距离成本+道路阻塞成本
to_load_cost = alpha * cost_to_load_blockage + beta * path.park_load_distance
else:
path = session_postgre.query(WalkTime).filter_by(load_area_id=load_area_id,
unload_area_id=unload_area_id).first()
for lane_id in path.to_unload_lanes:
cost_to_unload_blockage = cost_to_unload_blockage + self.lane_cost_generate(lane_id)
if lane_id in self.lane.used_lane_set:
if lane_id in lane_cost_memory:
lane_cost = lane_cost_memory[lane_id]
else:
lane_cost = self.lane_cost_generate(lane_id)
lane_cost_memory[lane_id] = lane_cost
cost_to_unload_blockage = cost_to_unload_blockage + lane_cost
for lane_id in path.to_load_lanes:
cost_to_load_blockage = cost_to_load_blockage + self.lane_cost_generate(lane_id)
if lane_id in self.lane.used_lane_set:
if lane_id in lane_cost_memory:
lane_cost = lane_cost_memory[lane_id]
else:
lane_cost = self.lane_cost_generate(lane_id)
lane_cost_memory[lane_id] = lane_cost
cost_to_load_blockage = cost_to_load_blockage + lane_cost
to_unload_cost = alpha * cost_to_unload_blockage + beta * path.to_unload_distance
to_load_cost = alpha * cost_to_load_blockage + beta * path.to_load_distance
# print("拥堵因子-挖机")
# print(alpha, cost_to_load_blockage)
# print("拥堵因子-卸点")
# print(alpha, cost_to_unload_blockage)
except Exception as es:
logger.error(f'道路{load_area_id + "-" + unload_area_id}行驶成本计算异常')
logger.error(es)
......@@ -131,15 +137,20 @@ class PathPlanner(WalkManage):
return to_load_cost, to_unload_cost
def lane_cost_generate(self, lane_id):
""" 计算路段拥堵成本
:param lane_id: (uuid) 路段id
:return:
lane_blockage: (float) 路段拥堵度
"""
lane_blockage = 0 # 路段拥堵度默认为0
try:
lane_rec = session_postgre.query(Lane).filter_by(Id=lane_id).first() # 读取路段记录
# 读取路段记录
lane_rec = session_postgre.query(Lane).filter_by(Id=lane_id).first()
lane_length = lane_rec.Length # 道路长度
# 道路长度
lane_length = lane_rec.Length
# 车辆自由行驶时的速度
clear_speed = lane_rec.MaxSpeed
clear_speed = lane_rec.MaxSpeed # 车辆自由行驶时的速度
# 1. 计算阻塞时车辆密度=路段长度/车辆长度
truck_density = lane_length / self.truck_length
......@@ -154,19 +165,29 @@ class PathPlanner(WalkManage):
return lane_blockage
def walk_cost_cal(self):
"""
计算路网行驶成本
:return:
cost_to_excavator: (Matrix[int]) 卸载区驶往装载区行驶成本
cost_to_dump: (Matrix[int]) 装载区驶往卸载区行驶成本
cost_park_to_excavator: (Matrix[int]) 备停区驶往装载区行驶成本
"""
self.excavator.excavator_para_period_update()
alpha = 0 # 阻塞成本权重
self.dump.dump_para_period_update()
beta = 1 # 距离成本权重
self.truck.truck_para_period_update(self.dump, self.excavator)
lane_cost_memory = {} # 路段拥堵度列表, 记忆化搜索
self.truck.state_period_update()
# self.controller.period_update(self.dump, self.excavator, self.truck)
# 距离成本启用
rule1 = session_mysql.query(DispatchRule).filter_by(id=1).first()
if rule1.disabled == 0:
beta = rule1.rule_weight
# 计算行驶成本前,更新路网速度信息
self.lane.lane_speed_generate()
# 拥堵成本启用
rule2 = session_mysql.query(DispatchRule).filter_by(id=2).first()
if rule2.disabled == 0:
alpha = rule2.rule_weight
try:
# 读取路网成本
......@@ -176,14 +197,20 @@ class PathPlanner(WalkManage):
load_area_index = load_area_uuid_to_index_dict[load_area_id]
self.cost_to_load_area[unload_area_index][load_area_index], \
self.cost_to_unload_area[unload_area_index][load_area_index] = \
self.path_cost_generate(load_area_id, unload_area_id, False)
self.path_cost_generate(load_area_id, unload_area_id, False, lane_cost_memory, alpha, beta)
# self.cost_to_load_area[unload_area_index][load_area_index] = \
# WalkManage.distance_to_load_area[unload_area_index][load_area_index]
# self.cost_to_unload_area[unload_area_index][load_area_index] = \
# WalkManage.distance_to_unload_area[unload_area_index][load_area_index]
# 读取备停区路网成本
for walk_time_park in session_postgre.query(WalkTimePark).all():
park_area_index = park_uuid_to_index_dict[str(walk_time_park.park_area_id)]
load_area_index = load_area_uuid_to_index_dict[str(walk_time_park.load_area_id)]
self.cost_park_to_load_area[park_area_index][load_area_index], _ = \
self.path_cost_generate(str(walk_time_park.load_area_id), str(walk_time_park.park_area_id), True)
self.path_cost_generate(str(walk_time_park.load_area_id), str(walk_time_park.park_area_id), True, lane_cost_memory, alpha, beta)
# self.cost_park_to_load_area[park_area_index][load_area_index] = \
# WalkManage.distance_park_to_load_area[park_area_index][load_area_index]
logger.info(self.cost_park_to_load_area)
logger.info(self.distance_park_to_excavator)
......@@ -191,6 +218,9 @@ class PathPlanner(WalkManage):
logger.error('路网信息计成本计算异常')
logger.error(es)
# 基于工作区路网行驶成本, 更新设备间逻辑路网行驶成本
# 调度算法以设备间逻辑路网作为调度的依据
self.cost_to_excavator = np.zeros_like(get_value("distance_to_excavator"))
self.cost_to_dump = np.zeros_like(get_value("distance_to_dump"))
self.cost_park_to_excavator = np.zeros_like(get_value("distance_park_to_excavator"))
......@@ -199,10 +229,8 @@ class PathPlanner(WalkManage):
logger.info(self.distance_park_to_excavator)
# try:
# 路网权重
walk_to_excavator_weight, walk_to_dump_weight, park_walk_weight = self.controller.weighted_walk_calc()
# # 路网禁用关系
# walk_available = self.controller.walk_available_calc()
# group_walk_available = self.controller.update_group_walk_available()
......@@ -234,7 +262,6 @@ class PathPlanner(WalkManage):
for j in range(get_value("dynamic_excavator_num")):
load_area_index = self.excavator_index_to_load_area_index_dict[j]
self.cost_park_to_excavator[0][j] = self.cost_park_to_load_area[0][load_area_index] / park_walk_weight[0][j]
print()
logger.info("真实路网距离-驶往挖机:")
logger.info(self.distance_to_excavator)
......@@ -259,12 +286,17 @@ class PathPlanner(WalkManage):
class LaneInfo:
def __init__(self):
self.lane_speed_dict = {}
self.lane_speed_dict = {} # 路段速度表
self.used_lane_set = [] # 存在矿卡路段表
def update_truck_speed(self):
# 读取矿卡实时速度信息
"""读取矿卡实时速度信息
:return:
truck_speed_dict: (Dict{key:truck_id, value:speed}) 矿卡速度表
"""
truck_speed_dict = {}
try:
truck_speed_dict = {}
device_name_set = redis2.keys()
for item in device_name_set:
item = item.decode(encoding='utf-8')
......@@ -278,13 +310,14 @@ class LaneInfo:
logger.error(f'矿卡{item}实时速度读取异常')
logger.error(es)
print("truck_speed_dict")
print(truck_speed_dict)
return truck_speed_dict
def update_truck_loacate(self):
# 读取矿卡所在路段信息
"""读取矿卡所在路段信息
:return:
truck_locate_dict: (Dict{key:truck_id, value:lane_id}) 矿卡所在路段表
"""
self.used_lane_set = []
# try:
truck_locate_dict = {}
device_name_set = redis2.keys()
......@@ -301,85 +334,81 @@ class LaneInfo:
and (str_to_byte('laneId') in key_set):
truck_locate = key_value_dict[str_to_byte('laneId')]
truck_locate_dict[truck_name_to_uuid_dict[item]] = eval(truck_locate)
self.used_lane_set.append(eval(truck_locate))
# except Exception as es:
# logger.error(f'矿卡{item}所在路段信息读取异常')
# logger.error(es)
print("truck_locate_dict")
print(truck_locate_dict)
logger.info("truck_locate_dict")
logger.info(truck_locate_dict)
return truck_locate_dict
def lane_speed_generate(self):
"""计算存在矿卡路段实时速度
:return:
lane_speed_dict: (Dict{key:lane_id, value:avg_speed}) 各路段矿卡平均速度
"""
# truck -> lane
truck_locate_dict = self.update_truck_loacate()
truck_locate_dict = self.update_truck_loacate() # truck -> lane
logger.info("矿卡位于路段:")
logger.info(truck_locate_dict)
# truck -> speed
truck_speed_dict = self.update_truck_speed()
truck_speed_dict = self.update_truck_speed() # truck -> speed
logger.info("矿卡当前速度:")
logger.info(truck_speed_dict)
try:
# lane_set, 用到的路段集合
lane_set = []
for walk_time in session_postgre.query(WalkTime).all():
for lane in walk_time.to_load_lanes:
lane_set.append(lane)
for lane in walk_time.to_unload_lanes:
lane_set.append(lane)
for walk_time_park in session_postgre.query(WalkTimePark).all():
for lane in walk_time_park.park_load_lanes:
lane_set.append(lane)
lane_set = set(lane_set)
except Exception as es:
logger.error('所用路网路段集合读取异常')
logger.info(es)
# lane -> speed, 各路段平均行驶速度
self.lane_speed_dict = {}
# try:
# # lane_set, 用到的路段集合
# lane_set = []
# for walk_time in session_postgre.query(WalkTime).all():
# for lane in walk_time.to_load_lanes:
# lane_set.append(lane)
# for lane in walk_time.to_unload_lanes:
# lane_set.append(lane)
# for walk_time_park in session_postgre.query(WalkTimePark).all():
# for lane in walk_time_park.park_load_lanes:
# lane_set.append(lane)
# lane_set = set(lane_set)
# except Exception as es:
# logger.error('所用路网路段集合读取异常')
# logger.info(es)
# lane -> num, 各路段行驶车辆
lane_trucks_dict = {}
self.lane_speed_dict = {} # lane -> avg_speed
# used lane, 存在行驶矿卡的路段
tmp_lane_set = []
lane_trucks_dict = {} # lane -> truck_num
try:
# 初始化
for lane_id in lane_set:
for lane_id in self.used_lane_set:
self.lane_speed_dict[str(lane_id)] = 0
lane_trucks_dict[str(lane_id)] = 0
# 对于各路段信息
print("truck_locate_dict")
print(truck_locate_dict.keys())
for truck in truck_locate_dict.keys():
lane_id = truck_locate_dict[truck]
logger.info("lane_speed_generate-lane_id")
logger.info(lane_id)
if lane_id in lane_set and truck in truck_speed_dict and truck in truck_locate_dict:
self.lane_speed_dict[truck_locate_dict[truck]] = self.lane_speed_dict[truck_locate_dict[truck]] + \
truck_speed_dict[truck]
# 矿卡速度字段和位置字段存在
if truck in truck_speed_dict and truck in truck_locate_dict:
self.lane_speed_dict[truck_locate_dict[truck]] += truck_speed_dict[truck]
# 该路段矿卡数量加一
lane_trucks_dict[truck_locate_dict[truck]] = lane_trucks_dict[truck_locate_dict[truck]] + 1
# 记录存在行驶矿卡的路段
tmp_lane_set.append(lane_id)
lane_trucks_dict[truck_locate_dict[truck]] += 1
# # 记录存在行驶矿卡的路段
# self.used_lane_set.append(lane_id)
# 存在矿卡的路段
logger.info("存在矿卡的路段:")
logger.info(tmp_lane_set)
# 对不存在的矿卡路段,实时速度设置为最高
for lane_id in lane_set:
if lane_id not in tmp_lane_set:
self.lane_speed_dict[str(lane_id)] = session_postgre.query(Lane).filter_by(
Id=lane_id).first().MaxSpeed
lane_trucks_dict[str(lane_id)] = 1
logger.info(self.used_lane_set)
# # 对不存在的矿卡路段,实时速度设置为最高
# for lane_id in lane_set:
# if lane_id not in self.used_lane_set:
# self.lane_speed_dict[str(lane_id)] = session_postgre.query(Lane).filter_by(
# Id=lane_id).first().MaxSpeed
# lane_trucks_dict[str(lane_id)] = 1
# 各路段实时速度取平均
for lane in lane_trucks_dict:
......
......@@ -12,7 +12,7 @@ from equipment.excavator import *
from para_config import *
class PriorityController():
class PriorityController:
def __init__(self, dump, excavator, truck):
# 设备类
......
......@@ -18,6 +18,11 @@ from dispatcher import Dispatcher, PreSchedule
def process(dispatcher):
"""
周期调度进程
:param dispatcher: (Dispatch Class) 矿卡调度类对象
:return: None
"""
try:
# 更新周期参数
......@@ -41,14 +46,9 @@ def process(dispatcher):
# 周期更新
dispatcher.dispatcher_period_update()
# try:
# 调度计算
dispatcher.schedule_construct()
# except Exception as es:
# logger.error("更新不及时")
# logger.error(es)
logger.info("#####################################周期更新结束#####################################")
......@@ -101,7 +101,7 @@ if __name__ == "__main__":
pre_sch = PreSchedule(truck, excavator, dump)
# 实例化矿卡调度器
dispatcher = Dispatcher(dump, excavator, truck, pre_sch)
dispatcher = Dispatcher(dump, excavator, truck, pre_sch, False)
logger.info(" ")
logger.info("调度系统启动")
......
......@@ -121,7 +121,7 @@ def build_equipment_uuid_name_map():
raise Exception("无卸载设备可用")
except Exception as es:
logger.warning(es)
return excavator_uuid_to_name_dict,dump_uuid_to_name_dict
return excavator_uuid_to_name_dict, dump_uuid_to_name_dict
def update_deveices_map(unload_area_uuid_to_index_dict, load_area_uuid_to_index_dict):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment