The main function

This is defined in a function called ES that has the following arguments: the name of the Gym environment, the size of the neural network's hidden layers, the total number of generations, the number of workers, the Adam learning rate, the batch size, and the standard deviation noise:

def ES(env_name, hidden_sizes=[8,8], number_iter=1000, num_workers=4, lr=0.01, batch_size=50, std_noise=0.01):

Then, we set an initial seed that is shared among the workers to initialize the parameters with the same weights. Moreover, we calculate the number of individuals that a worker has to generate and evaluate on each iteration and create two multiprocessing.Queue queues. These queues are the entry and exit points for the variables that are passed to and from the workers:

    initial_seed = np.random.randint(1e7)
indiv_per_worker = int(batch_size / num_workers)
output_queue = mp.Queue(maxsize=num_workers*indiv_per_worker)
params_queue = mp.Queue(maxsize=num_workers)

Next, the multiprocessing processes, multiprocessing.Process, are instantiated. These will run the worker function, which is given as the first argument to the Process constructor in an asynchronous way. All the other variables that are passed to the worker function are assigned to args and are pretty much the same as the parameters taken by ES, with the addition of the two queues. The processes start running when the start() method is called:

    processes = []

for widx in range(num_workers):

p = mp.Process(target=worker, args=(env_name, initial_seed, hidden_sizes, lr, std_noise, indiv_per_worker, str(widx), params_queue, output_queue))
p.start()
processes.append(p)

Once the parallel workers have started, we can iterate across the generations and wait until all the individuals have been generated and evaluated separately in each worker. Remember that the total number of individuals that are created on every generation is the number of workers, num_workers, multiplied by the individuals generated on each worker, indiv_per_worker. This architecture is unique to our implementation as we have only four CPU cores available, compared to the implementation in the paper, which benefits from thousands of CPUs. Generally, the population that's created on every generation is usually between 20 and 1,000:

    for n_iter in range(number_iter):
batch_seed
= []
batch_return = []

for _ in range(num_workers*indiv_per_worker):
p_rews, p_seed = output_queue.get()
batch_seed.append(p_seed)
batch_return.extend(p_rews)

In the previous snippet, output_queue.get() gets an element from output_queue, which is populated by the workers. In our implementation, output_queue.get() returns two elements. The first element, p_rews, is the fitness value (the return value) of the agent that's generated using p_seed, which is given as the second element. 

When the for cycle terminates, we rank the returns and put the batch returns and seeds on the params_queue queue, which will be read by all the workers to optimize the agent. The code for this is as follows:

        batch_return = normalized_rank(batch_return)

for _ in range(num_workers):
params_queue.put([batch_return, batch_seed])

Finally, when all the training iterations have been executed, we can terminate the workers:

    for p in processes:
p.terminate()

This concludes the main function. Now, all we need to do is implement the workers.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset