I am writing this class for parallel chunk level uploads to my local server. This reads the files in a given directory, uploads with n threads in parallel.
Initially, sends a request and obtains a request id for unique identification. Then, proceeds to upload the chunks in parallel. One thread handles upload of a file. The upload process is in the class FileUploader
. I am calling a function once the code is completed. This is a recursive call. I think the bottleneck occurs when the file size is really huge.
Here's the code:
public class TestUploader
{
private String mainDir = Constants.dumpPath.substring(Constants.dumpPath.lastIndexOf("\\") + 1);
private String baseUrl = "<baseurl>";
private Map<String, String> params = null;
private ExecutorService execService;
private Map<String, String> folderIDMap;
private Map<String, String> fileIDMap;
private static Stopwatch timer = null;
private int chunkLength = 1048576 * 4;
public TestUploader() throws SAXException, IOException, ParserConfigurationException
{
// Initialize all the request and try to login
}
public static void main(String[] args)
{
TestUploader testUploader = null;
try
{
testUploader = new TestUploader();
testUploader.startUpload();
}
catch (SAXException e)
{
e.printStackTrace();
}
catch (IOException e)
{
e.printStackTrace();
}
catch (ParserConfigurationException e)
{
e.printStackTrace();
}
finally
{
if (testUploader != null)
try
{
testUploader.logout();
timer.stop();
System.out.println("Upload completed in " + timer.elapsed(TimeUnit.SECONDS) + " seconds");
}
catch (IOException e)
{
e.printStackTrace();
}
}
}
public void startUpload() throws ClientProtocolException, IOException, ParserConfigurationException, SAXException
{
String parentDir = mainDir;
folderIDMap = new HashMap<String, String>();
fileIDMap = new HashMap<String, String>();
createFolder(parentDir, null);
File folder = new File(Constants.dumpPath);
HttpClient httpclient = HttpClientBuilder.create().setMaxConnPerRoute(Constants.NTHREDS)
.setMaxConnTotal(Constants.NTHREDS).build();
execService = Executors.newFixedThreadPool(Constants.NTHREDS);
requestExecService = new FutureRequestExecutionService(httpclient, execService);
new HashMap<String, HttpRequestFutureTask<String>>();
try
{
for (File file : folder.listFiles())
{
if (!file.isDirectory())
continue;
createFolder(file.getName(), parentDir);
}
ExecutorService executor = Executors.newFixedThreadPool(Constants.NTHREDS);
for (File file : folder.listFiles())
{
if (file.isDirectory())
{
for (File subFile : file.listFiles())
{
try
{
FileUploader fileUploader = new FileUploader(subFile.getAbsolutePath());
executor.execute(fileUploader);
}
catch (Exception e)
{
e.printStackTrace();
}
}
}
}
executor.shutdown();
try
{
executor.awaitTermination(10, TimeUnit.HOURS);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
finally
{
if (requestExecService != null)
requestExecService.close();
execService.shutdown();
System.out.println("Folder IDs:");
for (Entry<String, String> entry : folderIDMap.entrySet())
System.out.println(entry.getKey() + " " + entry.getValue());
System.out.println("File IDs:");
for (Entry<String, String> entry : fileIDMap.entrySet())
System.out.println(entry.getKey() + " " + entry.getValue());
}
}
private void createFolder(String folderName, String parentFolderName) throws ClientProtocolException, IOException,
ParserConfigurationException, SAXException
{
// Create folder request
}
public void logout() throws ClientProtocolException, IOException
{
// Logout request
}
class FileUploader implements Runnable
{
private final String url;
private File file;
private FileInputStream fileInputStream = null;
private byte bytes[] = null;
private String chunkOffset = null;
private String requestId = null;
private String eof = null;
private long fileSize;
private boolean once = false;
public FileUploader(String filePath) throws FileNotFoundException, IOException, ParserConfigurationException,
SAXException
{
url = baseUrl + "/drive/upload?uploadType=chunkedFile";
file = new File(filePath);
fileInputStream = new FileInputStream(file);
fileSize = file.length();
eof = "0";
chunkRequest();
}
private void chunkRequest() throws FileNotFoundException, IOException, ParserConfigurationException,
SAXException
{
HttpPost post = new HttpPost(url);
post.setHeader("Authtoken", params.get("token"));
String fileName = file.getName();
long lastModified = file.lastModified() / 1000L;
post.setHeader("ParentFolderPath", Base64.encodeBase64String((params.get("syncWebFolderId") + "\\"
+ mainDir + "\\" + file.getParentFile().getName()).getBytes()));
post.setHeader("FileName", Base64.encodeBase64String(fileName.getBytes()));
post.setHeader("FileSize", Long.toString(fileSize));
post.setHeader("FileModifiedtime", Long.toString(lastModified));
if (fileSize <= chunkLength)
{
bytes = new byte[(int) fileSize];
eof = "1";
}
else
bytes = new byte[chunkLength];
post.setHeader("FileEOF", eof);
fileInputStream.read(bytes);
post.setEntity(new ByteArrayEntity(bytes));
// System.out.println("Sending chunk request for " + fileName);
HttpResponse response = HttpClientBuilder.create().build().execute(post);
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode == HttpStatus.SC_OK)
{
String responseString = new BasicResponseHandler().handleResponse(response);
// System.out.println("Response String for " + fileName + ": " + responseString);
Element documentElement = getXMLDocument(responseString).getDocumentElement();
int errorCode = Integer.parseInt(documentElement.getAttribute("errorCode"));
if (errorCode == 200)
{
if (eof.equals("0"))
{
requestId = documentElement.getAttribute("requestId");
chunkOffset = documentElement.getAttribute("chunkOffset");
// System.out.println("Initial upload for " + fileName + " " + requestId + " " + chunkOffset);
}
else
{
System.out.println("Upload done for " + fileName);
fileIDMap.put(file.getParentFile().getName() + "\\" + fileName,
documentElement.getAttribute("fileID"));
}
}
else
{
String errorString = documentElement.getAttribute("errorString");
throw new ClientProtocolException("Error under chunkRequest for " + fileName + "! Error code: "
+ errorCode + "\nError String: " + errorString);
}
}
else if (statusCode == HttpStatus.SC_CONFLICT)
{
InputStream inputStream = response.getEntity().getContent();
Element documentElement = getXMLDocument(IOUtils.toString(inputStream, "UTF-8")).getDocumentElement();
IOUtils.closeQuietly(inputStream);
requestId = documentElement.getAttribute("requestId");
chunkOffset = documentElement.getAttribute("chunkOffset");
System.out.println("Conflict at chunkRequest for " + fileName + ". RequestId: " + requestId
+ " . ChunkOffset: " + chunkOffset);
// throw new ClientProtocolException("Failed under chunkRequest for " + fileName + "! Status code: "
// + statusCode + "\nReason: " + response.getStatusLine().getReasonPhrase());
}
else
throw new ClientProtocolException("Failed under chunkRequest for " + fileName + "! Status code: "
+ statusCode + "\nReason: " + response.getStatusLine().getReasonPhrase());
}
@Override
public void run()
{
Thread.currentThread().setName(this.getClass().getSimpleName() + " " + file.getName());
try
{
if (requestId != null && chunkOffset != null && !once)
{
once = true;
chunkUpload();
}
}
catch (IOException | ParserConfigurationException | SAXException e)
{
e.printStackTrace();
}
finally
{
if (fileInputStream != null)
try
{
fileInputStream.close();
}
catch (IOException e)
{
e.printStackTrace();
}
}
}
private void chunkUpload() throws FileNotFoundException, IOException, ParserConfigurationException,
SAXException
{
String fileName = file.getName();
HttpPost post = new HttpPost(new StringBuilder(url).append("&requestId=").append(requestId).toString());
post.setHeader("Authtoken", params.get("token"));
long offset = Long.parseLong(chunkOffset);
if (offset + chunkLength >= fileSize)
{
bytes = new byte[(int) (fileSize - offset)];
eof = "1";
}
else
bytes = new byte[chunkLength];
post.setHeader("FileEOF", eof);
fileInputStream.skip(offset);
fileInputStream.read(bytes);
post.setEntity(new ByteArrayEntity(bytes));
HttpResponse response = HttpClientBuilder.create().build().execute(post);
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode == HttpStatus.SC_OK)
{
String responseString = new BasicResponseHandler().handleResponse(response);
// System.out.println("Response String " + responseString);
Element documentElement = getXMLDocument(responseString).getDocumentElement();
int errorCode = Integer.parseInt(documentElement.getAttribute("errorCode"));
if (errorCode == 200)
{
if (eof.equals("0"))
{
chunkOffset = documentElement.getAttribute("chunkOffset");
// System.out.println("Upload response for " + fileName + " " + requestId + " " + chunkOffset);
chunkUpload();
}
else
{
System.out.println("Upload done for " + fileName);
fileIDMap.put(file.getParentFile().getName() + "\\" + fileName,
documentElement.getAttribute("fileID"));
}
}
else
{
String errorString = documentElement.getAttribute("errorString");
throw new ClientProtocolException("Error under upload for " + fileName + "! Error code: "
+ errorCode + "\nError String: " + errorString);
}
}
else if (statusCode == HttpStatus.SC_CONFLICT)
{
InputStream inputStream = response.getEntity().getContent();
Element documentElement = getXMLDocument(IOUtils.toString(inputStream, "UTF-8")).getDocumentElement();
IOUtils.closeQuietly(inputStream);
requestId = documentElement.getAttribute("requestId");
chunkOffset = documentElement.getAttribute("chunkOffset");
System.out.println("Conflict at chunkUpload for " + fileName + ". Resuming with RequestId: "
+ requestId + " . ChunkOffset: " + chunkOffset);
chunkUpload();
}
else
{
throw new ClientProtocolException("Failed under upload for " + fileName + "! Status code: "
+ statusCode + "\nReason: " + response.getStatusLine().getReasonPhrase());
}
}
}
}
I need some advice whether the current approach is good because I have doubts that the recursive call is going to hurt the progress for files with really huge size. If this is the case, what will be the approach for iterative solution?