I wrote a simple multitthreading program which does particle swarm optimization. Optimization itself and some corresponding functions were taken from https://nathanrooy.github.io/posts/2016-08-17/simple-particle-swarm-optimization-with-python/. This is my first attempt to write multithread Python code :)
Code has 4 files:
- const.py - just some constants
- main.py - entry point
- particle.py particle class as a thread
- particle_producer - class which synchronizes all particles and computes next position/velocity
Thank you for your help :)
const.py
MAX_ITERATIONS = 300
NUMBER_OF_PARTICLES = 10
NUM_DIMENSIONS = 2
main.py
import threading
import random
from consts import NUM_DIMENSIONS, NUMBER_OF_PARTICLES
from particle import Particle
from particle_producer import ParticleSwarmProducer
import numpy as np
# Function to optimize
def fnc(x):
total=0
for i in range(len(x)):
total+=x[i]**2
return total
if __name__ == '__main__':
x_down, x_up = (-100, 100)
# All shared datastrucures between particles and particle producer
dict_shared_new_position = {i: list(np.random.uniform(x_down, x_up, NUM_DIMENSIONS)) for i in range(0, NUMBER_OF_PARTICLES)}
dict_shared_best_position = dict_shared_new_position.copy()
dict_velocity = {i: [random.uniform(-1,1)] * NUM_DIMENSIONS for i in range(0, NUMBER_OF_PARTICLES)}
dict_shared_errors = {i: -1 for i in range(0, NUMBER_OF_PARTICLES)}
dict_shared_is_ready = {i: False for i in range(0, NUMBER_OF_PARTICLES)}
bounds = [(x_down, x_up) for i in range(NUMBER_OF_PARTICLES)]
condition_wait = threading.Condition()
producer = ParticleSwarmProducer(initial_particle_position=dict_shared_new_position,
bounds=bounds,
dict_shared_errors=dict_shared_errors,
dict_shared_is_ready=dict_shared_is_ready,
dict_shared_new_position=dict_shared_new_position,
dict_shared_best_positions=dict_shared_best_position,
dict_velocity=dict_velocity,
condition_wait=condition_wait
)
particles = []
for i in range(NUMBER_OF_PARTICLES):
p = Particle(thread_id=i,
name='Thread ' + str(i+1),
dict_shared_errors=dict_shared_errors,
dict_shared_is_ready=dict_shared_is_ready,
dict_shared_new_position=dict_shared_new_position,
dict_shared_best_positions=dict_shared_best_position,
fnc=fnc,
condition_wait=condition_wait)
particles.append(p)
producer.start()
for p in particles:
p.start()
producer.join()
for p in particles:
p.join()
particle.py
import threading
from consts import MAX_ITERATIONS
threadLock = threading.Lock()
class Particle(threading.Thread):
""" Represents one particle object with specific position/velocity"""
def __init__(self,
thread_id,
name,
dict_shared_errors,
dict_shared_is_ready,
dict_shared_new_position,
dict_shared_best_positions,
fnc,
condition_wait
):
threading.Thread.__init__(self)
self.thread_id = thread_id
self.name = name
self.dict_shared_errors = dict_shared_errors
self.dict_shared_is_ready = dict_shared_is_ready
self.dict_shared_new_positions = dict_shared_new_position
self.dict_shared_best_positions = dict_shared_best_positions
self.fnc = fnc
self.condition_wait = condition_wait
def run(self) -> None:
it = 1
best_particle_error = 9999
while it < MAX_ITERATIONS+1:
# print(f'{self.name} waiting for a new job...')
threadLock.acquire(blocking=True)
self.dict_shared_is_ready[self.thread_id] = True
threadLock.release()
with self.condition_wait:
self.condition_wait.wait()
threadLock.acquire(blocking=True)
position = self.dict_shared_new_positions[self.thread_id]
error = self.fnc(position)
if error < best_particle_error:
self.dict_shared_best_positions[self.thread_id] = position
best_particle_error = error
self.dict_shared_errors[self.thread_id] = self.fnc(position)
# set to not ready
self.dict_shared_is_ready[self.thread_id] = False
threadLock.release()
# print(f'{self.name} working on task')
it +=1
# print(f'{self.name}, {it}')
particle_producer.py
import threading
import time
import random
import numpy as np
from consts import NUM_DIMENSIONS, NUMBER_OF_PARTICLES, MAX_ITERATIONS
threadLock = threading.Lock()
class ParticleSwarmProducer(threading.Thread):
""" Synchronizes all particles and computes their next position/velocity"""
def __init__(self,
initial_particle_position,
bounds,
dict_shared_errors,
dict_shared_is_ready,
dict_shared_new_position,
dict_shared_best_positions,
dict_velocity,
condition_wait):
threading.Thread.__init__(self)
self.initial_particle_positions = initial_particle_position
self.bounds = bounds
self.dict_velocity = dict_velocity
self.dict_best_positions = dict_shared_best_positions
self.dict_shared_errors = dict_shared_errors
self.dict_shared_is_ready = dict_shared_is_ready
self.dict_shared_new_position = dict_shared_new_position
self.condition_wait = condition_wait
self.current_iteration = 0
self.err_best_g = -1 # best error for group
self.pos_best_g = [] # best position for group
# Used for plotting
self.output_pos = {i: np.empty((0, NUM_DIMENSIONS)) for i in range(NUMBER_OF_PARTICLES)}
def run(self) -> None:
i = 1
while i < MAX_ITERATIONS+1:
threadLock.acquire(blocking=True)
ready = list(self.dict_shared_is_ready.values())
# If all particles have finished their current jobs...
if all(ready):
print(f'Iteration {i}')
print(f'Current positions: {self.dict_shared_new_position}')
print(f'Current errors: {self.dict_shared_errors}')
print(f'Current velocities: {self.dict_velocity}')
self.add_pos_to_out()
self.evaluate_all_particles()
self.update_all_particles()
print('All particles go!')
with self.condition_wait:
self.condition_wait.notifyAll()
i += 1
threadLock.release()
# time.sleep(0.2)
time.sleep(0.02)
with self.condition_wait:
self.condition_wait.notifyAll()
print(f'Current positions: {self.dict_shared_new_position}')
print(f'Error: {self.err_best_g}')
def evaluate_all_particles(self):
for i in range(NUMBER_OF_PARTICLES):
if self.dict_shared_errors[i] < self.err_best_g or self.err_best_g == -1:
self.pos_best_g = list(self.dict_shared_new_position[i])
self.err_best_g = float(self.dict_shared_errors[i])
def update_all_particles(self):
for i in range(NUMBER_OF_PARTICLES):
self.update_velocity(i)
self.update_position(i)
def add_pos_to_out(self):
for i in range(NUMBER_OF_PARTICLES):
self.output_pos[i] = np.vstack((self.output_pos[i], self.dict_shared_new_position[i]))
def update_velocity(self, i):
w = 0.5 # constant inertia weight (how much to weigh the previous velocity)
c1 = 1 # cognative constant
c2 = 2 # social constant
for j in range(0, NUM_DIMENSIONS):
r1 = random.random()
r2 = random.random()
vel_cognitive = c1 * r1 * (self.dict_best_positions[i][j] - self.dict_shared_new_position[i][j])
vel_social = c2 * r2 * (self.pos_best_g[j] - self.dict_shared_new_position[i][j])
self.dict_velocity[i][j] = w * self.dict_velocity[i][j] + vel_cognitive + vel_social
def update_position(self, i):
for j in range(0, NUM_DIMENSIONS):
self.dict_shared_new_position[i][j] = self.dict_shared_new_position[i][j] + self.dict_velocity[i][j]
# adjust maximum position if necessary
if self.dict_shared_new_position[i][j] > self.bounds[i][1]:
self.dict_shared_new_position[i][j] = self.bounds[i][1]
# adjust minimum position if neseccary
if self.dict_shared_new_position[i][j] < self.bounds[i][0]:
self.dict_shared_new_position[i][j] = self.bounds[i][0]