def computeDistances(layout):
"Runs UCS to all other positions from each position"
distances = {}
allNodes = layout.walls.asList(False)
for source in allNodes:
dist = {}
closed = {}
for node in allNodes:
dist[node] = sys.maxint
import util
queue = util.PriorityQueue()
queue.push(source, 0)
dist[source] = 0
while not queue.isEmpty():
node = queue.pop()
if node in closed:
continue
closed[node] = True
nodeDist = dist[node]
adjacent = []
x, y = node
if not layout.isWall((x,y+1)):
adjacent.append((x,y+1))
if not layout.isWall((x,y-1)):
adjacent.append((x,y-1) )
if not layout.isWall((x+1,y)):
adjacent.append((x+1,y) )
if not layout.isWall((x-1,y)):
adjacent.append((x-1,y))
for other in adjacent:
if not other in dist:
continue
oldDist = dist[other]
newDist = nodeDist+1
if newDist < oldDist:
dist[other] = newDist
queue.push(other, newDist)
for target in allNodes:
distances[(target, source)] = dist[target]
return distances
python类PriorityQueue()的实例源码
def uniformCostSearch(problem):
"Search the node of least total cost first. "
frontier = PriorityQueue()
startNode = Node((problem.getStartState(), None, None))
#Check if start node is goal
if problem.isGoalState(startNode.state):
return []
for successors in problem.getSuccessors(problem.getStartState()):
newNode = Node(successors, startNode)
frontier.push(newNode, 0)
explored = list()
explored.append(startNode.state)
while not frontier.isEmpty():
leafNode = frontier.pop()
if problem.isGoalState(leafNode.state):
return leafNode.getPath()
explored.append(leafNode.state)
for successor in problem.getSuccessors(leafNode.state):
newNode = Node(successor, leafNode)
if newNode.state not in frontier.stateList and newNode.state not in explored:
frontier.push(newNode, newNode.pathCost)
return []
def aStarSearch(problem, heuristic=nullHeuristic):
"Search the node that has the lowest combined cost and heuristic first."
# Use the genericSearch method, with the fringe maintained with a
# PriorityQueue. The cost is calculated using the provided heuristic.
# If no heuristic is given (such as UCS), then default to the given
# nullHeuristic
return genericSearch(problem, util.PriorityQueue(), heuristic)
# Abbreviations
def uniformCostSearch(problem):
"""Search the node of least total cost first."""
"*** YOUR CODE HERE ***"
startState = problem.getStartState()
visited = set()
actions = []
fringe = util.PriorityQueue()
fringe.push((startState, None, None, actions), 0)
while not fringe.isEmpty():
currPath = fringe.pop()
currState = currPath[0]
action = currPath[1]
stepCost = currPath[2]
actions = currPath[3]
if problem.isGoalState(currState):
return actions
if not currState in visited:
visited.add(currState)
paths = problem.getSuccessors(currState)
for path in paths:
if not path[0] in visited:
newActions = list(actions)
newActions.append(path[1])
fringe.push((path[0],path[1],path[2],newActions), problem.getCostOfActions(newActions))
util.raiseNotDefined()
def aStarSearch(problem, heuristic=nullHeuristic):
"""Search the node that has the lowest combined cost and heuristic first."""
"*** YOUR CODE HERE ***"
startState = problem.getStartState()
visited = set()
fringe = util.PriorityQueue()
fringe.push((startState, ()), heuristic(startState,problem))
while not fringe.isEmpty():
currNode = fringe.pop()
currState = currNode[0]
currPlan = currNode[1]
if problem.isGoalState(currState):
return list(currPlan)
if not currState in visited:
visited.add(currState)
paths = problem.getSuccessors(currState)
for path in paths:
newPlan = list(currPlan)
newPlan.append(path[1])
nextNode = (path[0], tuple(newPlan))
if not path[0] in visited:
fringe.push(nextNode, heuristic(path[0],problem)
+ problem.getCostOfActions(newPlan))
# Abbreviations
def uniformCostSearch(problem):
"""Search the node of least total cost first."""
"*** YOUR CODE HERE ***"
"we use PriorityQueue data structure, 'smart queue' "
"struture and help us to find the lower cost of action"
"use PriorityQueue to detect which route is shortest, and pop the shortest route"
nodePriorityQueue = util.PriorityQueue()
visited = []
path = []
totalCost = 0
startNode =(problem.getStartState(),path)
nodePriorityQueue.push((startNode),totalCost)
"start while loop to find the correct path"
while nodePriorityQueue.isEmpty() is False:
node,path = nodePriorityQueue.pop()
visited.append(node)
if problem.isGoalState(node):
return path
for successor, direction, cost in problem.getSuccessors(node) :
if successor not in visited:
visited.append(successor)
newNode =(successor,path + [direction])
nodePriorityQueue.push(newNode,problem.getCostOfActions(path + [direction]))
if problem.isGoalState(successor):
newNode =(successor,path + [direction])
nodePriorityQueue.push(newNode,problem.getCostOfActions(path + [direction]))
return None
util.raiseNotDefined()
def aStarSearch(problem, heuristic=nullHeuristic):
"""Search the node that has the lowest combined cost and heuristic first."""
"*** YOUR CODE HERE ***"
"it is very similar to UCS, just change the totalcost to the heuristic + cost "
nodePriorityQueue = util.PriorityQueue()
visited = []
path = []
totalCost = heuristic(problem.getStartState(),problem)
startNode =(problem.getStartState(),path)
nodePriorityQueue.push((startNode),totalCost)
"start while loop to find the correct path"
while nodePriorityQueue.isEmpty() is False:
node,path = nodePriorityQueue.pop()
visited.append(node)
if problem.isGoalState(node):
return path
for successor, direction, cost in problem.getSuccessors(node) :
if successor not in visited:
visited.append(successor)
newNode =(successor,path+[direction])
totalCost = problem.getCostOfActions(path + [direction]) + heuristic(successor,problem)
nodePriorityQueue.push(newNode,totalCost)
if problem.isGoalState(successor):
newNode =(successor,path + [direction])
totalCost = problem.getCostOfActions(path + [direction]) + heuristic(successor,problem)
nodePriorityQueue.push(newNode,totalCost)
return None
util.raiseNotDefined()
# Abbreviations
def uniformCostSearch(problem):
"Search the node of least total cost first. "
"*** YOUR CODE HERE ***"
frontier = util.PriorityQueue()
visited = []
startNode = ((problem.getStartState(), None, 0), [], 0)
frontier.push(startNode, None)
while not frontier.isEmpty():
curr = frontier.pop()
currLoc = curr[0][0]
currDir = curr[0][1]
currPath = curr[1]
currCost = curr[2]
if currLoc not in visited:
visited.append(currLoc)
if(problem.isGoalState(currLoc)):
return currPath
successors = problem.getSuccessors(currLoc)
successorsList = list(successors)
for i in successorsList:
if i[0] not in visited:
if(problem.isGoalState(i[0])):
return currPath + [i[1]]
newNode = (i, currPath+[i[1]], currCost + i[2])
frontier.push(newNode, currCost + i[2])
return []
def aStarSearch(problem, heuristic=nullHeuristic):
"Search the node that has the lowest combined cost and heuristic first."
"*** YOUR CODE HERE ***"
frontier = util.PriorityQueue()
visited = []
h = heuristic(problem.getStartState(), problem)
g = 0
f = g + h
startingNode = (problem.getStartState(), None, g, []);
frontier.push(startingNode, f)
while not frontier.isEmpty():
curr = frontier.pop()
currLoc = curr[0]
currDir = curr[1]
currCost = curr[2]
if currLoc not in visited:
currPath = curr[3]
visited.append(currLoc)
successors = problem.getSuccessors(currLoc)
successorsList = list(successors)
for i in successorsList:
if i[0] not in visited:
if(problem.isGoalState(i[0])):
return currPath + [i[1]]
h = heuristic(i[0], problem)
g = currCost + i[2]
f = g + h
newNode = (i[0], i[1], g, currPath+[i[1]])
frontier.push(newNode, f)
return []
# Abbreviations
def uniformCostSearch(problem):
loc_pqueue = PriorityQueue()
visited_node = {}
parent_child_map = {}
direction_list = []
path_cost = 0
start_node = problem.getStartState()
parent_child_map[start_node] = []
loc_pqueue.push(start_node,path_cost)
def traverse_path(parent_node):
while True:
map_row = parent_child_map[parent_node]
if (len(map_row) == 3):
parent_node = map_row[0]
direction = map_row[1]
direction_list.append(direction)
else:
break
return direction_list
while (loc_pqueue.isEmpty() == False):
parent_node = loc_pqueue.pop()
if (parent_node != problem.getStartState()):
path_cost = parent_child_map[parent_node][2]
if (problem.isGoalState(parent_node)):
pathlist = traverse_path(parent_node)
pathlist.reverse()
return pathlist
elif (visited_node.has_key(parent_node) == False):
visited_node[parent_node] = []
sucessor_list = problem.getSuccessors(parent_node)
no_of_child = len(sucessor_list)
if (no_of_child > 0):
temp = 0
while (temp < no_of_child):
child_nodes = sucessor_list[temp]
child_state = child_nodes[0];
child_action = child_nodes[1];
child_cost = child_nodes[2];
gvalue = path_cost + child_cost
if (visited_node.has_key(child_state) == False):
loc_pqueue.push(child_state,gvalue)
if (parent_child_map.has_key(child_state) == False):
parent_child_map[child_state] = [parent_node,child_action,gvalue]
else:
if (child_state != start_node):
stored_cost = parent_child_map[child_state][2]
if (stored_cost > gvalue):
parent_child_map[child_state] = [parent_node,child_action,gvalue]
temp = temp + 1
def uniformCostSearch(problem):
loc_pqueue = PriorityQueue()
visited_node = {}
parent_child_map = {}
direction_list = []
path_cost = 0
start_node = problem.getStartState()
parent_child_map[start_node] = []
loc_pqueue.push(start_node,path_cost)
def traverse_path(parent_node):
while True:
map_row = parent_child_map[parent_node]
if (len(map_row) == 3):
parent_node = map_row[0]
direction = map_row[1]
direction_list.append(direction)
else:
break
return direction_list
while (loc_pqueue.isEmpty() == False):
parent_node = loc_pqueue.pop()
if (parent_node != problem.getStartState()):
path_cost = parent_child_map[parent_node][2]
if (problem.isGoalState(parent_node)):
pathlist = traverse_path(parent_node)
pathlist.reverse()
return pathlist
elif (visited_node.has_key(parent_node) == False):
visited_node[parent_node] = []
sucessor_list = problem.getSuccessors(parent_node)
no_of_child = len(sucessor_list)
if (no_of_child > 0):
temp = 0
while (temp < no_of_child):
child_nodes = sucessor_list[temp]
child_state = child_nodes[0];
child_action = child_nodes[1];
child_cost = child_nodes[2];
gvalue = path_cost + child_cost
if (visited_node.has_key(child_state) == False):
loc_pqueue.push(child_state,gvalue)
if (parent_child_map.has_key(child_state) == False):
parent_child_map[child_state] = [parent_node,child_action,gvalue]
else:
if (child_state != start_node):
stored_cost = parent_child_map[child_state][2]
if (stored_cost > gvalue):
parent_child_map[child_state] = [parent_node,child_action,gvalue]
temp = temp + 1