I wrote a simple server to distribute locks on a network. It is used in a multi-server environment with a shared home directory to secure non-multiprocessing safe file writing (PyTables
and HDF5).
The idea is rather simple: A lock client (with a unique id) asks the server if a lock with a given name is already locked. If so it WAIT
s until it is released, locks it again and GO
es.
The server keeps a dictionary of locks with lock names as keys and client ids as values. The latter is done to check that a lock can only be released by the client who locked it in the first place.
Server Script:
import logging
import zmq
class LockerServer(object):
""" Server that manages locks across a network """
PING = 'PING'
PONG = 'PONG'
DONE = 'DONE'
LOCK = 'LOCK'
RELEASE_ERROR = 'RELEASE_ERROR'
MSG_ERROR = 'MSG_ERROR'
UNLOCK = 'UNLOCK'
UNLOCKED = 'UNLOCKED'
GO = 'GO'
WAIT = 'WAIT'
DELIMITER = ':'
DEFAULT_LOCK = '_DEFAULT_'
CLOSE = 'CLOSE'
def __init__(self, url="tcp://127.0.0.1:7899"):
self._locks = {}
self._url = url
self._logger = None
def _lock(self, name, id_):
if name not in self._locks:
# Lock is available and unlocked.
# Client locks it (aka addition to dict) and
# can go on into a sequential code block.
self._locks[name] = id_
return self.GO
else:
# Lock is not available and locked by someone else.
# Client needs to wait until it is released
return self.WAIT
def _unlock(self, name, id_):
locker_id = self._locks[name]
if locker_id != id_:
# Locks can only be locked and
# then unlocked by the same client.
response = (self.RELEASE_ERROR + self.DELIMITER +
'Lock was acquired by `%s` and not by '
'`%s`.' % (locker_id, id_))
self._logger.error(response)
return response
else:
# Unlocks the lock (aka removal from dict).
del self._locks[name]
return self.UNLOCKED
def run(self):
"""Runs Server"""
try:
self._logger = logging.getLogger('LockServer')
self._logger.info('Starting Lock Server')
context = zmq.Context()
socket_ = context.socket(zmq.REP)
socket_.bind(self._url)
while True:
msg = socket_.recv_string()
name = None
id_ = None
if self.DELIMITER in msg:
msg, name, id_ = msg.split(self.DELIMITER)
if msg == self.DONE:
socket_.send_string(self.CLOSE + self.DELIMITER +
'Closing Lock Server')
self._logger.info('Closing Lock Server')
break
elif msg == self.LOCK:
if name is None or id_ is None:
response = (self.MSG_ERROR + self.DELIMITER +
'Please provide name and id for locking')
self._logger.error(response)
else:
response = self._lock(name, id_)
socket_.send_string(response)
elif msg == self.UNLOCK:
if name is None or id_ is None:
response = (self.MSG_ERROR + self.DELIMITER +
'Please provide name and id for unlocking')
self._logger.error(response)
else:
response = self._unlock(name, id_)
socket_.send_string(response)
elif msg == self.PING:
socket_.send_string(self.PONG)
else:
response = (self.MSG_ERROR + self.DELIMITER +
'MSG `%s` not understood' % msg)
self._logger.error(response)
socket_.send_string(response)
except Exception:
self._logger.exception('Crashed Lock Server!')
raise
def run_server():
logging.basicConfig(level=logging.DEBUG)
server = LockerServer()
server.run()
if __name__ == '__main__':
run_server()
Client and Test Script:
import time
import os
import multiprocessing as mp
import socket
import zmq
import random
from lock_server import LockerServer
class LockerClient(object):
""" Implements a Lock using req-rep scheme with LockerServer """
SLEEP = 0.01
def __init__(self, url="tcp://127.0.0.1:7899",
lock_name=LockerServer.DEFAULT_LOCK):
self.lock_name = lock_name
self.url = url
self._context = None
self._socket = None
self.id = None
def __getstate__(self):
result_dict = self.__dict__.copy()
# Do not pickle zmq data
result_dict['_context'] = None
result_dict['_socket'] = None
return result_dict
def start(self):
"""Starts connection to server.
Makes ping-pong test as well
"""
self._context = zmq.Context()
self._socket = self._context.socket(zmq.REQ)
self._socket.connect(self.url)
self.test_ping()
# create a unique id based on host name and process id
# (what about many clients per process?? should that be considered?)
self.id = (socket.getfqdn().replace(LockerServer.DELIMITER, '-') +
'__' + str(os.getpid()))
def send_done(self):
"""Notifies the Server to shutdown"""
if self._socket is None:
self.start()
self._socket.send_string(LockerServer.DONE)
self._socket.recv_string() # Final receiving of closing
def test_ping(self):
"""Connection test"""
self._socket.send_string(LockerServer.PING)
response = self._socket.recv_string()
if response != LockerServer.PONG:
raise RuntimeError('Connection Error to Lock Server')
def finalize(self):
"""Closes socket"""
if self._socket is not None:
self._socket.close()
self._socket = None
self._context = None
def acquire(self):
"""Acquires lock and returns `True`
Blocks until lock is available.
"""
if self._context is None:
self.start()
request = (LockerServer.LOCK + LockerServer.DELIMITER +
self.lock_name + LockerServer.DELIMITER + self.id)
while True:
self._socket.send_string(request)
response = self._socket.recv_string()
if response == LockerServer.GO:
return True
elif response == LockerServer.WAIT:
time.sleep(self.SLEEP)
else:
raise RuntimeError('Response `%s` not understood' % response)
def release(self):
"""Releases lock"""
request = (LockerServer.UNLOCK + LockerServer.DELIMITER +
self.lock_name + LockerServer.DELIMITER + self.id)
self._socket.send_string(request)
response = self._socket.recv_string()
if response != LockerServer.UNLOCKED:
raise RuntimeError('Could not release lock `%s` (`%s`) '
'because of `%s`!' % (self.lock_name,
self.id, response))
def the_job(idx):
"""Simple job executed in parallel
Just sleeps randomly and prints to console.
Capital letters signal parallel printing
"""
lock = LockerClient()
random.seed(idx)
sleep_time = random.uniform(0.0, 0.07) # Random sleep time
lock.acquire()
print('This')
time.sleep(sleep_time)
print('is')
time.sleep(sleep_time*1.5)
print('a')
time.sleep(sleep_time*3.0)
print('sequential')
time.sleep(sleep_time)
print('block')
time.sleep(sleep_time)
print('by %d' % idx)
lock.release()
print('_HAPPENING')
time.sleep(sleep_time)
print('_IN')
time.sleep(sleep_time/2.0)
print('_PARALLEL')
time.sleep(sleep_time*1.5)
print('_BY %d' % idx)
def run_pool():
# Create a pool and run the job in parallel
pool = mp.Pool(8)
pool.map(the_job, range(16))
pool.close()
pool.join()
# Tell server to shutdown
lock = LockerClient()
lock.send_done()
if __name__ == '__main__':
run_pool()
To test it put both in separate python files, let's say lock_server.py
and lock_test.py
. Start first the server with python lock_server.py
and then the clients via python lock_test.py
.
The output should look like this:
_HAPPENING
This
_IN
_PARALLEL
is
_PARALLEL
_BY 1
a
sequential
block
_BY 15
by 3
Where the text This is a sequential block by <idx>
should always be in order, i.e. something like This a is This block block sequential
should never occur. All other (capital) messages can appear at any time.
I implicitly assume that for every Python process and every lock_name
there exists only a single client, i.e. the unique id does not distinguish between several clients per process. Should I consider this?
By the way, in my use case I only need a single lock i.e. keeping a dictionary of potentially several locks is not necessary, but I wanted a generic approach.
Are there any flaws with this design? Anything that might lead to concurrent writing in sequential blocks?
_HAPENNING
is misspelt. It should be_HAPPENING
. – Quill Aug 18 at 9:02