Sharedelevator class.ipynbOpen in CoCalc
###### Elevator Simulation
## CS166: Modeling, Simulation & Decision Making
import math
import numpy as np
import random
import matplotlib.pyplot as plt

from scipy.stats import ttest_ind
from matplotlib import ticker
from copy import deepcopy

%matplotlib inline
plt.rcParams['figure.figsize'] = (18, 4)
# # Simulation parameters
# passenger_count = int(input("How many passengers will use the elevator?"))
# max_capacity = int(input("How many people can the elevator carry?"))
# n_floors = int(input("How many floors are there in the building?"))
# max_time = range(1800) # Time steps

passenger_count = 100
max_capacity = 5
n_floors = 20
max_time = range(1800) # Time steps

Definition of the elevator class

class Elevator:

    def __init__(self, max_floors = n_floors, default_floor = 1, capacity = max_capacity, logs = False):
        '''
        Initializes an elevator object. 
        Inputs:
        
         * max_floors (int) The number of floors in the building. 
         
         * default_floor (int) The floor the elevator returns to if
           there are no calls or requests.
           
         * capacity (int) The maximum number of people the elevator
           can carry at the same time.
           
         * logs (bool) Whether to print several debugging messages
           when running the simulation.
           
        '''

        self.max_floors = max_floors
        self.default_floor = default_floor
        self.current_floor = self.default_floor
        self.user_count = 0 # How many users are in the elevator.
        self.capacity = capacity #OR max_capacity = int(input("How many people can the elevator carry?"))
        self.call_pool = {} 
        self.calls = {} # which user pressed the button and attempting to go in which direction
        self.requests = {} # which buttons were pressed inside the elevator by whom
        self.direction = +1 # +1 if the elevator is going upward, -1 if going downward

##Nico's methods    
    
    def add_call(self):
        '''
        At each time step, there is a chance that will method will be called from
        the building method timestep(), adding a random user to the list of calls.
        '''
        # Choose a random user from pool. Assign the popped user to dictionary of elevator calls.
        calling_user = random.choice(list(self.call_pool.keys()))
        self.calls[calling_user] = self.call_pool.pop(calling_user) #remove one call from the call_pool, append it to the elevator calls
        return calling_user

    def stop(self):
        '''
        At each time step, this method is called to evaluated whether the elevator
        should stop at the current floor. Returns a boolean that answers the 
        question: Should the elevator stop?
        '''
        calls_copy = list(self.calls.values())
        calls_copy = [abs(call) for call in calls_copy]
        # Checks whether current floor is the desired destination of any users inside.
        if (self.current_floor in self.requests.values()):
            return True
        
        # Checks whether current floor and direction matches the call of any users outside, if capacity has not been reached.
        elif (self.direction*self.current_floor in self.calls.values()) and (self.user_count < self.capacity):
            return True
        
        elif (self.current_floor in calls_copy) and (self.user_count == 0):
            return True
        else:
            return False # Move elevator
    
    def swap(self, users, logs = False):
        '''
        At each time step, if the elevator stops, this method allows users to
        leave or enter the elevator.
        ''' 
        
        #we want to pop things from dictionary while we iterate
        calls_copy = deepcopy(self.calls)
        requests_copy = deepcopy(self.requests)
        
        # Allows people to leave, iterating over passengers and floors in the dictionary of requests.
        for passenger, floor in requests_copy.items():
            if floor == self.current_floor:
                self.requests.pop(passenger) # Return dropped passenger and remove it from queue of requests_over_time
                dropped_passenger = passenger
                # Question: Can't we just pop and use the object passenger to set .in_elevator to False?
                self.user_count -=1
                if logs: print('Passenger %d was dropped at floor %d.' %(passenger, self.current_floor))
                users[dropped_passenger].in_elevator = False # dropped passenger is now outside the elevator
        
        # Allows people to enter, iterating over passengers and floors in the dictionary of calls.
        for passenger, floor in calls_copy.items():
            if self.user_count == self.capacity: # If the elevator is full, break.
                break
            
            elif self.user_count == 0:
                if abs(floor) == self.current_floor:
                    self.calls.pop(passenger) # Return added passenger and remove it from queue of calls
                    added_passenger = passenger
                    self.user_count +=1
                    if logs: print('Passenger %d entered the elevator at floor %d.' %(passenger, self.current_floor))
                    users[added_passenger].in_elevator = True # Added passenger is now inside the elevator
                    self.requests[passenger] = users[passenger].destination # Add their destination to requests
                    if logs: print(users[passenger].destination,'DESTINATION OF PASSENGER',passenger)   
            
            elif floor == self.direction*self.current_floor: # If floor and direction match call
                self.calls.pop(passenger) # Return added passenger and remove it from queue of calls
                # Same question from loop above applies.
                added_passenger = passenger
                self.user_count +=1
                if logs: print('Passenger %d entered the elevator at floor %d.' %(passenger, self.current_floor))
                users[added_passenger].in_elevator = True # Added passenger is now inside the elevator
                self.requests[passenger] = users[passenger].destination # Add their destination to requests
                if logs: print(users[passenger].destination,'DESTINATION OF PASSENGER',passenger)

    def path(self):
        '''
        This strategy looks at each time step where it has to move next. 
        It takes into account the calls and requests, prioritizing requests over calls. 
        If there are no requests or calls it goes to the default (ground) floor.
        If there are no requests, but calls are present, it prioritizes the calls that are going to **sweep** the most floors while moving.
        
        '''

        # Create a list of all the called floors
        current_calls = list(self.calls.values())

        if self.current_floor >= self.max_floors: # >= for debugging
            self.direction = -1 # Direction is either -1 or 1

        # If there are no requests from inside the elevator, pursue the strategy
        elif len(self.requests) == 0:

            # If there are no calls, go to default floor
            if len(current_calls) == 0:
                if self.current_floor-self.default_floor > 0:
                    self.direction = -1
                # If already on default floor, stay
                else:
                    self.direction = 0

            # If there are calls outside the elevator
            else:
                #Get the min of all calls. Negative numbers are calls going down, so it will prioritize high floors going down
                negmax = min(current_calls)
                #Create a list with only the calls going up

                poscalls = [i for i in current_calls if i > 1]
                if len(poscalls)>0:
                    #Min of the list of calls going up, that way you end up with the lowest floor calling to go up
                    posmin = min(poscalls)
                #If there's only 1 call and no requests, go towards the called floor
                if len(current_calls) == 1:    
               
                    if current_calls[0]>0:
                        direction = self.current_floor - current_calls[0]
                        if direction >0:
                            self.direction = -1
                        else:
                            self.direction = +1
                    else:
                        direction = self.current_floor + current_calls[0]
                        if direction >0:
                            self.direction = -1
                        else:
                            self.direction = +1       
                           
                elif 1 in current_calls:
                    #If multiple calls, prioritize the top or ground floor. If both are being called, then go go the one closest to current floor.
                    if self.max_floors in current_calls:
                        if current_floor<=(0.5*self.max_floors):
                           
                            self.direction = -1
                        else:
                            self.direction = +1
                elif self.max_floors in current_calls:
                    self.direction = +1
                #If there are no calls going up just go towards the highest floor being called
                elif len(poscalls) == 0:
                    direction = self.current_floor + negmax
                    if direction > 0:
                        self.direction = -1
                    else:
                        self.direction = +1
                #If there are only calls going up, go towards the lowest up call.    
                elif posmin == negmax:
                    direction = self.current_floor - negmax
                    if direction > 0:
                        self.direction = -1
                    else:
                        self.direction = +1   
                #If both calls going up and down are present, choose the one that is closest to the ground floor or top floor, respectively.
                elif (posmin-1) < (self.max_floors + negmax):
                    if self.current_floor < posmin:
                        self.direction = +1
                    else:
                        self.direction = -1
                elif (posmin-1) > (self.max_floors + negmax):

                    if self.current_floor>(negmax*-1):
                        self.direction = -1
                    else:
                        self.direction = +1               
                else:
                    #If the distance between the calls going up and down to its 'roof floors' are the same, 
                    #prioritize the one closest to current floor
                    pos = abs(self.current_floor - posmin)
                    neg = abs(self.current_floor - negmax)
                    if pos<neg:
                        if (self.current_floor - posmin) >0:
                            self.direction = -1
                        else:
                            self.direction = +1
                    else:
                        if (self.current_floor + negmax) >0:
                            self.direction = -1
                        else:
                            self.direction = +1

        else:
            #If there are requests, go towards any requested floor, as they are all in the same direction.
            request1 = list(self.requests.values())
            direction = self.current_floor - request1[0]
            if direction < 0:
                self.direction = +1
            else:

                self.direction = -1


    def move(self, strategy = 1, logs = False):
        '''
        At each time step, move the elevator if it has not stopped.
        '''
                
        if self.direction == +1:
            self.current_floor += 1
        elif self.direction == -1:
            self.current_floor -= 1
            
        if self.direction == 1:
            dir_string = 'up'
        elif self.direction == -1:
            dir_string = 'down'
        elif self.direction == 0 :
            dir_string = 'No movement' 
            
        if logs: print('Elevator moves %s from floor %d carrying %d people.' %(dir_string, self.current_floor, self.user_count))
        
        if strategy == 1:
            if self.current_floor >= self.max_floors: #>= for debugging
                self.direction = -1 #direction is either -1 or 1
        
            if self.current_floor == 1:
                self.direction = +1


Definition of the building class

class Building:
   
    def __init__(self, elevator, users):
#     '''
#     Initializes the building object.
#     Inputs:
        
#      * max_floors (int) The number of floors in the building.
    
#      * elevator (Elevator) The elevator used in the simulation.
      
#      * capacity (list of Passenger) A list of users used in the sim.
       
#      * logs (bool) Whether to print several debugging messages
#        when running the simulation.
#     '''
    
    
        self.max_floors = elevator.max_floors #OR n_floors = int(input("How many floors are there in the building?"))
        self.elevator = elevator
        self.users = users
        
        # There arrays are storages for values to be plotted later
        self.calls_over_time = []
        self.requests_over_time = []
        self.count_over_time = []
        
    def timestep(self, elevator, users, strategy = 1, logs = False):
#         '''
#         This is the main control function of the simulation. It checks 
#         whether the elevator should stop or move. If it stops, it allows
#         users to come in and out of the elevator. If it moves, it computes
#         the appropriate path.
#         '''
        
        #print(elevator.user_count, elevator.current_floor*elevator.direction, elevator.calls) #check evolution of attributes
            
        # Check if elevator needs to stop
        if elevator.stop(): # If the elevator stops, do as below
            if logs: print('Elevator stops at %d.' %(elevator.current_floor))
            elevator.swap(users, logs = logs) # Add and drop passengers
        else: # If elevator needs to move do as follows
            if strategy == 2: elevator.path() # Check where it has to go

            elevator.move(logs = logs, strategy = strategy) # Move a floor up or down

        if random.random() < 0.1: #10% chance of adding a new call from the pool regardless of action taken
                try:
                    if logs: print('User %d called the elevator.' %(elevator.add_call()))
# Nico: Why did you add .add_call() in the else?
                    else: elevator.add_call()
                except:
                    pass
    
    def store_data(self, elevator):
#         '''
#         Store final attribute values.
#         '''
        self.calls_over_time.append(len(elevator.calls))
        self.requests_over_time.append(len(elevator.requests))
        self.count_over_time.append(elevator.user_count)

Definition of the passenger class


class Passenger:
    
    def __init__(self, id, elevator, seed, in_elevator = False): #setting default values, otherwise specifiable
#         '''
#         Initializes a passenger object. A passenger has an id, an origin,
#         a destination, a direction, and a tracker on whether they are in the elevator or not.
#         '''
        ifrandom.seed(id + seed) #im putting the initialization of the two attributes below so that it takes the seed change
        
        if random.random() < 0.3: # 30% chance the user will start from ground floor
            self.current_floor = 1
                destination = random.randint(1, elevator.max_floors)
            while (destination == 1):
            destination = random.randint(2, elevator.max_floors)
                self.destination = destination

        else: # 70% chance the user will start anywhere else
            self.current_floor = random.randint(2, elevator.max_floors)
    
            if random.random() < 0.3: # 30% chance the user will want to go to ground floor
                self.destination = 1
            else:
                    stination = random.randint(1, elevator.max_floors) # 70% chance the user will want to go anywhere else
                    while (destination == self.current_floor):
                destination = random.randint(2, elevator.max_floors)
        self.destination = destination
        

        self.id = id
        self.direction=np.sign(self.destination - self.current_floor) # Direction as the sign of the difference
        self.in_elevator = in_elevator
    
    def __str__(self):
        '''
        Define how a Passenger object should be printed
        '''
        in_string = '' # String to represent direction ('up' or 'down')
        if self.in_elevator == True: in_string = 'inside'
        else: in_string = 'outside'
        
        dir_string = ''
        if self.direction == 1: dir_string = 'up'
        elif self.direction == -1: dir_string = 'down'
        elif self.direction == 0: dir_string = 'nowhere (they are at destination)' #this shouldn't be necessary
        
        return('Passenger %d is %s the elevator, wanting to go %s from floor %d to floor %d.') %(self.id,in_string,dir_string,self.current_floor,self.destination)

Overarching loop

def simulate_elevator(passenger_count, n_floors, max_capacity, max_time, logs = False, seed = 1, strategy = 1):
    '''
    The main function initializes storage arrays and the main objects:
    The Elevator, the list of Passengers, and the Building.
    The main loop iterates over the array of time steps, storing
    the data of the current step and calling on the .timestep()
    method of the building object. The loop breaks if no one
    is using the elevator.
    Inputs:

     * passenger_count (int) The numbers of users in the simulation.

     * n_floors (int) The number of floors in the building.

     * max_capacity (int) The maximum number of people the elevator can hold.

     * max_time (int) The maximum number of time steps of the simulation.
     
     * logs (bool) Defaults to false. Whether to print several messages during
       execution to help with debugging.
       
     * seed (int) Defaults to 1. A seed that changes the results of the
       stochastic parts of the model.
     
     * strategy (int) Defaults to 1. 1 is a basic strategy where the elevator traverses
       the entire building before changing directions. 2 is a more complex strategy that
       optimizes the elevator's path. 
       
     Outputs:
     
     * (dictionary) Includes three arrays storing the time steps of the simmulation,
       the number of calls over time, and the number of requests over time.
     '''
    
    elevator = Elevator(max_floors = n_floors, capacity = max_capacity) # Initialize elevator
    # for line before, an alternative is: passenger_count = int(input("How many passengers will use the elevator?"))
    users = [Passenger(id = i, elevator = elevator, seed = seed) for i in range(passenger_count)] # Generate users
    building = Building(elevator = elevator, users = users) # Initialize building

    for user in users:
        elevator.call_pool[user.id] = user.current_floor * user.direction ## IMPORTANT: This replentishes the calling queue!!

    for time_step in max_time: #main loop
        building.store_data(elevator)
        building.timestep(elevator, users, strategy = strategy, logs = logs)
        if len(elevator.requests)!=elevator.user_count:
            if logs: print(len(elevator.requests),elevator.user_count)

        if (time_step > 50) and (len(elevator.calls)==0) and (len(elevator.requests)==0) and (len(elevator.call_pool)==0):
            if logs: print('Simulation over after %d timesteps.' %time_step)
            break #end simulation if no one is using or calling the elevator

    building.store_data(elevator)
    return {'timesteps' : time_step,'calls' : building.calls_over_time, 'requests' : building.requests_over_time}

Plot of passenger statistics

waiting = simulate_elevator(passenger_count, n_floors, max_capacity, max_time, strategy = 1)['calls']
inside = simulate_elevator(passenger_count, n_floors, max_capacity, max_time, strategy = 1)['requests']

plot_calls = plt.plot(waiting, label='Waiting for elevator')
plt.title('Trolley elevator')
plt.xlabel('Timestep')
plot_requests = plt.plot(inside, label='In elevator')
plt.xlabel('Timestep')
plt.text(s='Average in elevator: '+str(round(np.mean(inside),2)),y=5.5,x=0)
plt.text(s='Average waiting: '+str(round(np.mean(waiting),2)),y=5,x=0)
plt.legend(loc=1)
plt.show()

waiting = simulate_elevator(passenger_count, n_floors, max_capacity, max_time, strategy = 2)['calls']
inside = simulate_elevator(passenger_count, n_floors, max_capacity, max_time, strategy = 2)['requests']

plot_calls = plt.plot(waiting, label='Waiting for elevator')
plt.title('Smart elevator')
plt.xlabel('Timestep')
plot_requests = plt.plot(inside, label='In elevator')
plt.xlabel('Timestep')
plt.text(s='Average in elevator: '+str(round(np.mean(inside),2)),y=6,x=0)
plt.text(s='Average waiting: '+str(round(np.mean(waiting),2)),y=5,x=0)
plt.legend(loc=1)
plt.show()

timesteps_trolley = []
timesteps_smart = []


for _ in range(2,100):
    timesteps_trolley.append(simulate_elevator(passenger_count = 100, n_floors = _, max_capacity = max_capacity, max_time = max_time, strategy = 1)['timesteps'])
    
for _ in range(2,100):
    timesteps_smart.append(simulate_elevator(passenger_count = 100, n_floors = _, max_capacity = max_capacity, max_time = max_time, strategy = 2)['timesteps'])
    
plt.plot(range(2,100),timesteps_trolley,label='Trolley elevator')
plt.plot(range(2,100),timesteps_smart,label='Smart elevator')
plt.xlabel('Number of floors in the building')
plt.xticks(np.arange(0,100,5))
plt.ylabel('Timesteps required to finish simulation')
plt.legend(loc=1)
plt.show()


timesteps_trolley = []
timesteps_smart = []

for _ in range(120):
    timesteps_trolley.append(simulate_elevator(passenger_count = _, n_floors = n_floors, max_capacity = max_capacity, max_time = max_time, strategy = 1)['timesteps'])
    
for _ in range(120):
    timesteps_smart.append(simulate_elevator(passenger_count = _, n_floors = n_floors, max_capacity = max_capacity, max_time = max_time, strategy = 2)['timesteps'])

m_trolley,b_trolley = np.polyfit(range(120), timesteps_trolley, 1)
m_smart,b_smart = np.polyfit(range(120), timesteps_smart, 1)


    
plt.plot(range(120),timesteps_trolley,label='Trolley elevator',linewidth=0.50)
plt.plot([m_trolley*x + b_trolley for x in range(120)],color=u'#1f77b4',label= str(round(m_trolley,4))+'x +'+str(round(b_trolley,4)))
plt.plot(range(120),timesteps_smart,label='Smart elevator',linewidth=0.50)
plt.plot([m_smart*x + b_trolley for x in range(120)],color=u'#ff7f0e',label= str(round(m_smart,4))+'x +'+str(round(b_smart,4)))
plt.plot
plt.xlabel('Number of passengers in the building')
plt.ylabel('Timesteps required to finish simulation')
plt.legend(loc=1)
plt.show()
timesteps_trolley = []
timesteps_smart = []

for _ in range(30):
    timesteps_trolley.append(simulate_elevator(passenger_count = 100, n_floors = 20, max_capacity = _, max_time = max_time, strategy = 1)['timesteps'])
    
for _ in range(30):
    timesteps_smart.append(simulate_elevator(passenger_count = 100, n_floors = 20, max_capacity = _, max_time = max_time, strategy = 2)['timesteps'])
    
plt.plot(range(30),timesteps_trolley,label='Trolley elevator')
plt.plot(range(30),timesteps_smart,label='Smart elevator')
plt.xlabel('Elevator capacity')
plt.ylabel('Timesteps required to finish simulation')
plt.legend(loc=1)
plt.grid(True)
plt.show()

timesteps_trolley = []
timesteps_smart = []

for _ in range(200):
    timesteps_trolley.append(simulate_elevator(passenger_count = passenger_count, n_floors = n_floors, max_capacity = max_capacity, max_time = max_time, seed =_,strategy=1)['timesteps'])
    
for _ in range(200):
    timesteps_smart.append(simulate_elevator(passenger_count = passenger_count, n_floors = n_floors, max_capacity = max_capacity, max_time = max_time, seed =_, strategy=2)['timesteps'])
    
plt.hist(timesteps_trolley,label='Trolley elevator',bins=9,alpha=0.5)
plt.hist(timesteps_smart,label='Smart elevator',bins=9,alpha=0.5)

plt.suptitle('Average timesteps taken: '+str(int(np.mean(timesteps_trolley)))+' (trolley) '+str(int(np.mean(timesteps_smart)))+' (smart)'+'\nMost timesteps taken: '+str(max(timesteps_trolley))+' (trolley) '+str(max(timesteps_smart))+' (smart)')
plt.text(s='p-value:'+str(ttest_ind(timesteps_trolley,timesteps_smart)[1]),y=50,x=1200,fontsize=12)
plt.xlabel('Timesteps required to finish simulation')
plt.ylabel('Relative frequency')
plt.show()