I am using Netty to exchange messages between a client and server. For this, I am maintaining a channel pool on client side. Please help me in optimizing this code.
public class Client{
private static final Logger s_logger = Logger.getLogger(Client.class);
private static volatile Client s_client;
private final String host;
private final int port;
private boolean[] used = null;
private Semaphore available = null;
private Channel[] channels = null;
private final Bootstrap b;
public static final AtomicInteger USED_CHANNELS = new AtomicInteger();
public static final AtomicInteger SEMAPHORE_COUNT = new AtomicInteger();
private Client(String host, int port){
this.host = host;
this.port = port;
used = new boolean[Config.getMaxConnections()];
available = new Semaphore(Config.getMaxConnections());
channels = new Channel[Config.getMaxConnections()];
EventLoopGroup workerGroup = new NioEventLoopGroup();
final ChannelInitializer<SocketChannel> channelInitializer = new BasicNioClientChannelInitializer();
b = new Bootstrap();
b.group(workerGroup)
.channel(NioSocketChannel.class)
.handler(channelInitializer)
.option(ChannelOption.SO_KEEPALIVE, true);
}
public static Client getClient() throws Exception{
if(s_client == null){
synchronized(Client.class){
if(s_client==null){
s_client = new Client(Config.getServerIp(),Config.getServerPort());
}
}
}
}
public int sendMessage(final String message) throws Exception{
final String messageToSend = message + "\r\n";
final Calendar sendStart = Calendar.getInstance();
int responseCode = 200;
available.acquire();
SEMAPHORE_COUNT.incrementAndGet();
Channel ch;
boolean activeChannel;
int i;
synchronized(this){
ch = null;
activeChannel = false;
for (i=0; i < Config.getMaxConnections(); i++){
if (!used[i]) {
used[i] = true;
USED_CHANNELS.incrementAndGet();
ch = channels[i];
break;
}
}
if(i == Config.getMaxConnections()){
//Should never happen
throw new Exception("No unsused connections.");
}
try{
activeChannel = ch!=null? ch.isActive():false;
if(!activeChannel){
if(ch!=null){
ch.close();
}
ChannelFuture cf = b.connect(host,port);
boolean connect = cf.await(Config.getMaxWaitInMillisToConnect());
if(!connect){
throw new Exception("Unable to try connection to Server in " +Config.getMaxWaitInMillisToConnect()+"ms" );
}
ch = cf.channel();
channels[i] = ch;
activeChannel = ch.isActive();
}
}catch(Exception e){
responseCode = 404;
s_logger.log(Level.FATAL, "***** Could not Connect to Server ****");
s_logger.log(Level.FATAL, e);
e.printStackTrace();
}
}
try{
if(activeChannel){
System.out.println("Sending to Server using Channel: " + i + " data: " + message);
ChannelFuture future = ch.writeAndFlush(messageToSend);
}else{
throw new Exception("Unable to connect to Server.");
}
}catch(Exception e){
responseCode = 404;
s_logger.log(Level.FATAL, "***** Could not Connect to Server ****");
s_logger.log(Level.FATAL, e);
e.printStackTrace();
}
synchronized(this){
for (int j = 0; j < Config.getMaxConnections(); j++) {
if (ch == channels[j]) {
if (used[j]) {
used[j] = false;
USED_CHANNELS.decrementAndGet();
break;
}
}
}
}
available.release();
SEMAPHORE_COUNT.decrementAndGet();
return responseCode;
}
}