Following is the main file in the project (executing tasks). It uses the mpi4py
module, and multiple processes execute the same code. Please review the code and post your suggestions on making it more efficient and pythonic.
from mpi4py import MPI
from Peers import Peers
from SegmentHandler import SegmentHandler
from Communicator import Communicator
import Message
import Datastore
import Queue
import random
import Logging
class JobScheduler:
def __init__(self, environment):
self.MAX_BACKLOG = 5
self.SegmentAssignProcId = 0
self.environment = environment
self.segmentHandler = SegmentHandler()
self.peers = Peers(self.environment.totalProc)
self.dataHandler = Datastore.Datastore()
def isSegmentAssigner(self):
return self.environment.procId == self.SegmentAssignProcId
def runMicroDownload(self):
if self.isSegmentAssigner():
self.microDownload()
def initLocalQueue(self):
self.toBeAdvertised = Queue.Queue()
self.requestQueue = Queue.Queue()
self.downloadRequests = Queue.Queue()
def handleAdvertisementQueue(self):
if not self.toBeAdvertised.empty():
adMessage = self.toBeAdvertised.get()
segmentId = adMessage.messageId
Logging.logProcessOp(processId=self.environment.procId,
op="pop",
depQueue=self.toBeAdvertised,
message=adMessage)
reqMessage = adMessage.getResponse()
# broadcast
for _ in range(self.environment.totalProc):
if _ != self.SegmentAssignProcId:
reqMessage.receiver = _
self.environment.send(_, reqMessage)
def handleDownloadRequestQueue(self):
if not self.downloadRequests.empty():
dwnldReqMessage = self.downloadRequests.get()
dwnldReqMessage.download()
Logging.logProcessOp(processId=self.environment.procId,
op="pop",
depQueue=self.downloadRequests,
message=dwnldReqMessage)
self.dataHandler.addSegment(dwnldReqMessage.messageId,
dwnldReqMessage.content)
self.dataHandler.store(dwnldReqMessage)
self.toBeAdvertised.put(dwnldReqMessage)
# get response and send it to the inititor
def handleRequestQueue(self):
if not self.requestQueue.empty():
reqMessage = self.requestQueue.get()
responseMsg = reqMessage.getResponse()
Logging.logProcessOp(processId=self.environment.procId,
op="pop",
depQueue=self.requestQueue,
message=reqMessage)
self.environment.send(responseMsg.receiver, responseMsg)
def runMicroNC(self):
if self.environment.procId != self.SegmentAssignProcId:
self.microNC()
def stopMicroNC(self):
if self.toBeAdvertised.empty() and self.requestQueue.empty() and \
self.downloadRequests.empty() and \
self.dataHandler.downlodedAll(4):
return True
return False
def microNC(self):
Logging.logProcessOp(processId=self.environment.procId,
op="startMicroNC")
self.initLocalQueue()
while not self.stopMicroNC():
nonDetchoice = random.randrange(4)
if nonDetchoice == 0:
_message = self.environment.nonBlockingReceive()
if _message:
Logging.logChannelOp(_message.sender,
_message.receiver,
'receive',
_message)
if isinstance(_message, Message.DownloadRequestMessage):
Logging.logProcessOp(processId=self.environment.procId,
op="push",
depQueue=self.downloadRequests,
message=_message)
self.downloadRequests.put(_message)
if isinstance(_message, Message.AdvertisementMessage):
# request the sender for the segments
if not self.dataHandler.getSegment(_message.messageId):
_response = _message.getResponse()
Logging.logChannelOp(_message.sender,
_message.receiver,
'send',
_response)
self.environment.send(_response.receiver,
_response)
if isinstance(_message, Message.RequestMessage):
# add this to the request queue
Logging.logProcessOp(processId=self.environment.procId,
op="push",
depQueue=self.requestQueue,
message=_message)
self.requestQueue.put(_message)
if isinstance(_message, Message.SegmentMessage):
Logging.logProcessOp(processId=self.environment.procId,
op="receivedSegments")
self.dataHandler.addSegment(_message.messageId,
_message.content)
self.dataHandler.store(_message)
if isinstance(_message, Message.RequestResponseMessage):
assert(self.environment.processId ==
self.SegmentAssignProcId)
Logging.logProcessOP(processId=self.environment.procId,
op="receivedDownloadConfirmation")
self.peers.removeBackLog(_message.sender)
if not _message.status:
self.segmentHandler.unassignSegment(
_message.messageId)
reqSegId = self.segmentHandler.getNextUnassigned()
requestSegment = RequestMessage(
self.SegmentAssignProcId,
reqSegId)
requestSegment.initMessage(
msgProperty=None,
msgContent=self.segmentHandler.getMetadata(
reqSegId))
self.environment.send(feedback.fromId,
requestSegment)
self.segmentHandler.assignSegment(reqSegId)
elif nonDetchoice == 1:
self.handleRequestQueue()
elif nonDetchoice == 2:
self.handleDownloadRequestQueue()
else:
self.handleAdvertisementQueue()
def microDownload(self):
Logging.logProcessOp(processId=self.environment.procId,
op="startMicroNC")
self.segmentHandler.downloadMetadata()
self.peers.initPeers(self.environment.totalProc)
while not self.segmentHandler.allAssigned():
peerId = self.peers.leastBusyPeer()
peerBackLog = self.peers.getBackLog(peerId)
if peerBackLog < self.MAX_BACKLOG:
requestSegmentId = self.segmentHandler.getNextUnassigned()
requestSegment = Message.DownloadRequestMessage(
self.SegmentAssignProcId,
requestSegmentId,
peerId)
requestSegment.initMessage(
msgProperty=None,
msgContent=self.segmentHandler.getMetadata(
requestSegmentId))
self.environment.send(peerId, requestSegment)
self.segmentHandler.assignSegment(requestSegmentId)
self.peers.addBackLog(peerId)
Logging.logChannelOp(chanFrom=self.SegmentAssignProcId,
chanTo=peerId,
op="send",
message=requestSegment)
else:
Logging.info("Waiting for feedback")