Code Review Stack Exchange is a question and answer site for peer programmer code reviews. It's 100% free, no registration required.

Sign up
Here's how it works:
  1. Anybody can ask a question
  2. Anybody can answer
  3. The best answers are voted up and rise to the top

Context: The below code will run a series of steps in an ETL workflow. I would like the code to be reviewed to avoid memory bottlenecks and null pointer exceptions etc.

UseCase:

Ingest the data from an RDBMS to Hadoop Hive database using Sqoop.

The code includes the below items

  • Logging the activity into a table like the JobID,StartTime,StopTime,RunTime etc
  • Connecting to the hive database using JDBC API for logging and other purposes.
  • For invoking a Sqoop command, I have created a class with Shell Utility and Sqoop Utility which deletes a hdfs directory if exists and then imports all the tables into hive. - Mailer Class which emails to the end user on job failure and success.

My idea of writing the below classes is to create libraries to reuse them in further workflows. I find the code to be very cumbersome having no standards and exception handling. Hence, I would like someone to review and help me make it better.

Summary:

The code Extracts and Loads data from one DB to another. I want code review to be done purely on the Exception handling and Calling Methods and which design pattern can be used.


MainApp.java (This is the class containing the main method. This class invokes all the other methods in an order)

package com.cisco.installbase.hiveconnector;

import java.util.Date;

public class MainApp {

    private static final String hiveDB = ReadProperties.getInstance().getProperty("hive_db");
    private static final String logTable = ReadProperties.getInstance().getProperty("IB_log_table");
    private static final String dataGovernanceLogTable = ReadProperties.getInstance().getProperty("SR_DG_table");
    private static final String dataGovernanceMasterTable = ReadProperties.getInstance()
            .getProperty("SR_DG_master_table");

    private static final String count_xxccs_ds_sahdr_core = "select count(*) from " + hiveDB + "."
            + "xxccs_ds_sahdr_core";
    private static final String count_mtl_system_items_b = "select count(*) from " + hiveDB + "."
            + "mtl_system_items_b";
    private static final String count_xxccs_scdc_product_profile = "select count(*) from " + hiveDB + "."
            + "xxccs_scdc_product_profile";
    private static final String count_xxccs_ds_cvdprdline_detail = "select count(*) from " + hiveDB + "."
            + "xxccs_ds_cvdprdline_detail";
    private static final String count_xxccs_ds_instance_detail = "select count(*) from " + hiveDB + "."
            + "xxccs_ds_instance_detail";

    private static int currentJobID = 0;
    private static Date startTime = null;
    private static Date stopTime = null;
    private static int runTime = 0;

    static CommonDBUtilities commonDB = new CommonDBUtilities();
    static ShellUtilities shellUtilities = new ShellUtilities();
    static SqoopUtility sqoop = new SqoopUtility();

    public static void main(String[] args) {

        MainApp.startTimeLogger();
        System.out.println("Started the Job");

    }

    public static void startTimeLogger() {
        // getting the Job ID and the start time for the log table

        if (Constants.isFlag()) {
            currentJobID = commonDB.getMaximumJobID();
            startTime = commonDB.getTime();
            MainApp.importTables();
            System.out.println("executing startTimeLogger");
        } else {
            MainApp.onFailure();
            JobMailer.PostMail("IB Load Failed", "Load failed while logging method name startTimeLogger()");
            System.out.println("executing startTimeLogger failed");
        }
    }

    public static void importTables() {
        // Delete target directory before running the sqoop imports

        if (Constants.isFlag()) {
            shellUtilities.DeleteDirectory(Constants.getMtlSystems());
            shellUtilities.DeleteDirectory(Constants.getProductLine());
            shellUtilities.DeleteDirectory(Constants.getInstanceDetail());
            shellUtilities.DeleteDirectory(Constants.getProductLine());
            shellUtilities.DeleteDirectory(Constants.getHeaderCore());

            // Run the sqoop imports to load the data from oracle to hive

            sqoop.runSqoop();
            MainApp.getCounts();
            System.out.println("executing importTables");
        } else {
            MainApp.onFailure();
            JobMailer.PostMail("IB Load Failed", "Load failed while running sqoop import method name importTables()");
            System.out.println("executing importTables failed");
        }

    }

    public static void getCounts() {

        // Get the record counts for all the IB tables pulled

        if (Constants.isFlag()) {
            commonDB.getCounts(count_xxccs_ds_instance_detail);
            commonDB.getCounts(count_xxccs_ds_cvdprdline_detail);
            commonDB.getCounts(count_xxccs_scdc_product_profile);
            commonDB.getCounts(count_mtl_system_items_b);
            commonDB.getCounts(count_xxccs_ds_sahdr_core);
            MainApp.stopTimeLogger();
            System.out.println("executing getCounts");
        } else {
            MainApp.onFailure();
            JobMailer.PostMail("IB Load Failed", "Load failed while getting counts method name getCounts()");
            System.out.println("executing getCounts failed");
        }
    }

    public static void stopTimeLogger() {
        // Get the stop time or end time
        if (Constants.isFlag()) {
            stopTime = commonDB.getTime();
            MainApp.runTimeLogger();
            System.out.println("executing stopTimeLogger");
        } else {
            MainApp.onFailure();
            JobMailer.PostMail("IB Load Failed", "Load failed while end logging method name stopTimeLogger()");
            System.out.println("executing stopTimeLogger failed");
        }
    }

    public static void runTimeLogger() {
        // Get the run time or total time taken
        if (Constants.isFlag()) {
            runTime = (int) (stopTime.getTime() - startTime.getTime()) / 1000 * 60 * 60 * 24;
            MainApp.onSuccess();
            MainApp.logGovernance();
            System.out.println("executing runTimeLogger");
        } else {
            MainApp.onFailure();
            JobMailer.PostMail("IB Load Failed", "Load failed while runtime logging method name runTimeLogger()");
            System.out.println("executing runTimeLogger failed");
        }
    }

    public static void logGovernance() {
        // IB Data governance

        if (Constants.isFlag()) {
            String dataGovernance = "Insert into table " + hiveDB + "." + dataGovernanceLogTable
                    + " select Data_Asset_Reference,File_Name,Origin_System,Transfer_System," + startTime
                    + ",Column_Reference,Element_Reference,Rule_Priority,Delete_By_Date,Classification,Geographic_Inclusion,Geographic_Restriction,Group_Inclusion,Group_Restriction,Reserved from "
                    + hiveDB + "." + dataGovernanceMasterTable;
            commonDB.InsertToTable(dataGovernance);
            System.out.println("executing logGovernance");
        } else {
            MainApp.onFailure();
            JobMailer.PostMail("IB Load Failed",
                    "Load failed while inserting into datagovernance method name logGovernance()");
            System.out.println("executing logGovernance failed");
        }
    }

    public static void onFailure() {
        // Write to log on Failure
        String insertOnFailure = "insert into table " + hiveDB + "." + logTable + " select " + currentJobID + ","
                + stopTime + "," + runTime + "," + "FAILED from " + hiveDB + "." + "dual" + " limit 1; ";
        commonDB.InsertToTable(insertOnFailure);
        JobMailer.PostMail("IB Load Failed", "Load failed");
        System.out.println("executing onFailure");
    }

    public static void onSuccess() {
        // Write to log on Success
        String insertOnSuccess = "insert into table " + hiveDB + "." + logTable + " select " + currentJobID + ","
                + stopTime + "," + runTime + "," + "SUCCESS from " + hiveDB + "." + "dual" + " limit 1; ";
        commonDB.InsertToTable(insertOnSuccess);
        JobMailer.PostMail("IB Load Successfully completed", "Load completed");
        System.out.println("executing onSuccess");
    }

}

ReadProperties.java (Reads the properties from the property file and uses the properties)

package com.cisco.installbase.hiveconnector;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Set;

public class ReadProperties {

    private final Properties props = new Properties();

    private ReadProperties() {
        InputStream in = this.getClass().getClassLoader().getResourceAsStream("config.properties");
        try {
            props.load(in);
            Constants.setFlag(true);
        } catch (IOException e) {
            e.printStackTrace();
            Constants.setFlag(false);
        }
    }

    private static class PropHolder {
        private static final ReadProperties INSTANCE = new ReadProperties();
    }

    public static ReadProperties getInstance() {
        return PropHolder.INSTANCE;
    }

    public String getProperty(String key) {
        return props.getProperty(key);
    }

    public Set<String> getAllPropertyNames() {
        return props.stringPropertyNames();
    }

    public boolean containsKey(String key) {
        return props.containsKey(key);
    }
}

CommonDBUtilities.java (This class has all the utilities required for DB operations)

   package com.cisco.installbase.hiveconnector;

    import java.sql.Connection;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    import java.sql.Statement;
    import java.util.Date;

    public class CommonDBUtilities {

        static final String hiveDB = ReadProperties.getInstance().getProperty("hive_db");
        static final String logTable = ReadProperties.getInstance().getProperty("IB_log_table");

        Connection con = CreateConnection.getInstance();
        Statement stm = null;
        ResultSet rs = null;
        PreparedStatement pstat = null;

        String sql1 = "select max(job_id)+1 from " + hiveDB + "." + logTable;

        public Date getTime() {
            return new Date();
        }

        public int getMaximumJobID() {
            int maximumID = -1;
            try {
                System.out.println(con);
                stm = con.createStatement();
                rs = stm.executeQuery(sql1);
                maximumID = rs.getInt(0);
                Constants.setFlag(true);
            } catch (SQLException e) {
                e.printStackTrace();
                Constants.setFlag(false);
            }
            return maximumID;
        }

        public int getCounts(String query) {
            int count = 0;
            try {
                stm = con.createStatement();
                rs = stm.executeQuery(query);
                count = rs.getInt(0);
                Constants.setFlag(true);
            } catch (SQLException e) {
                e.printStackTrace();
                Constants.setFlag(false);
            }

            return count;
        }

        public void InsertToTable(String insertquery) {
            try {
                pstat = con.prepareStatement(insertquery);
                Constants.setFlag(true);
            } catch (SQLException e) {
                e.printStackTrace();
                Constants.setFlag(false);
            }
        }
    }

CreateConnection.java (Singleton class which creates a connection object to the database)

package com.cisco.installbase.hiveconnector;

import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Connection;

public class CreateConnection {

    private static Connection instance = null;
    static final String drivername = "org.apache.hive.jdbc.HiveDriver";

    private CreateConnection() {

        try {
            Class.forName(drivername);
            // instance =
            // DriverManager.getConnection("jdbc:hive2://hddev-c01-edge-01:20000/",
            // "phodisvc", "B1GD4T4dev");
            // for hive 1 use this ------> instance =
            // DriverManager.getConnection("thrift://hddev-c01-edge-02:9083");
            instance = DriverManager.getConnection("jdbc:hive://hddev-c01-edge-01:9083/");
            System.out.println("get instance" + instance);
            Constants.setFlag(true);
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
            Constants.setFlag(false);
        } catch (SQLException e) {
            e.printStackTrace();
            Constants.setFlag(false);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static Connection getInstance() {
        if (instance == null) {
            instance = (Connection) new CreateConnection();
        }
        Constants.setFlag(true);
        return instance;
    }
}

ShellUtility.java (Utility class for the command line and other shell related operations)

package com.cisco.installbase.hiveconnector;

import java.io.IOException;
//import java.net.URI;
//import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class ShellUtilities {

    String target_dir = ReadProperties.getInstance().getProperty("target_dir");
    String tablename = "";
    // String maprfsURI = "maprfs://hdnprd-c01-r01-01:7222";

    Configuration conf = new Configuration();
    FileSystem fs = null;
    String dir = " ";

    public void DeleteDirectory(String tablename) {
        String fullpath = target_dir + tablename;
        try {
            // fs = FileSystem.get(new URI(maprfsURI), conf);
            fs = FileSystem.get(conf);
            Constants.setFlag(true);
        } catch (IOException e) {
            e.printStackTrace();
            Constants.setFlag(false);
        }
        // } catch (URISyntaxException e) {
        // e.printStackTrace();
        // Constants.setFlag(false);
        // }
        Path directory = new Path(fullpath);

        try {
            if (fs.exists(directory)) {
                fs.delete(directory, true);
            }
            Constants.setFlag(true);
        } catch (IOException e) {
            e.printStackTrace();
            Constants.setFlag(false);
        }

    }
}

SqoopUtility.java (Utility class to run a shell script)

package com.cisco.installbase.hiveconnector;

import java.io.IOException;

public class SqoopUtility {

    private static final String connectionString = "'" + ReadProperties.getInstance().getProperty("dburl") + "'";
    @SuppressWarnings("unused")
    private static final String edgeNode = ReadProperties.getInstance().getProperty("EDGE_HIVE_CONN");
    private static final String targetDir = ReadProperties.getInstance().getProperty("target_dir") + "/";
    private static final String userName = ReadProperties.getInstance().getProperty("user_name");
    private static final String password = ReadProperties.getInstance().getProperty("password");
    private static final String sqoopEdgeNode = ReadProperties.getInstance().getProperty("SQOOP_EDGE_CONN");
    private static final String hiveDB = ReadProperties.getInstance().getProperty("hive_db");

    String[] command = { "sh", "/apps/pentaho_nfs/installbase/input/poc/parallel.sh", sqoopEdgeNode, connectionString,
                    userName, password, targetDir, hiveDB };
    ProcessBuilder processBuilder = null;
    @SuppressWarnings("unused")
    private Process spawnProcess = null;

    public void runSqoop() {
            processBuilder = new ProcessBuilder(command);
            try {
                    spawnProcess = processBuilder.start();
                    Constants.setFlag(true);
            } catch (IOException e) {
                    e.printStackTrace();
                    Constants.setFlag(false);
            }
    }
}

Constants.java (Class having all the constants)

package com.cisco.installbase.hiveconnector;

public class Constants {

    private static boolean flag = false;

    public static boolean isFlag() {
        return flag;
    }

    public static void setFlag(boolean flag) {
        Constants.flag = flag;
    }

    private static String mtlSystems = "MTL_SYSTEM_ITEMS_B";
    private static String productLine = "XXCCS_DS_CVDPRDLINE_DETAIL";
    private static String instanceDetail = "XXCCS_DS_INSTANCE_DETAIL";
    private static String productProfile = "XXCCS_SCDC_PRODUCT_PROFILE";
    private static String headerCore = "XXCCS_DS_SAHDR_CORE";

    public static String getMtlSystems() {
        return mtlSystems;
    }

    public static String getProductLine() {
        return productLine;
    }

    public static String getInstanceDetail() {
        return instanceDetail;
    }

    public static String getProductProfile() {
        return productProfile;
    }

    public static String getHeaderCore() {
        return headerCore;
    }
}

// private static boolean startLogTableFlag = false;
// private static boolean ingestionFlag = false;
// private static boolean recordCountFlag = false;
// private static boolean stopLogTableFlag = false;
// private static boolean runtimeLogTableFlag = false;
// private static boolean writeToLogTableFlag = false;
// public static boolean isStartLogTableFlag() {
// return startLogTableFlag;
// }
//
// public static boolean isIngestionFlag() {
// return ingestionFlag;
// }
//
// public static boolean isRecordCountFlag() {
// return recordCountFlag;
// }
//
// public static boolean isStopLogTableFlag() {
// return stopLogTableFlag;
// }
//
// public static boolean isRuntimeLogTableFlag() {
// return runtimeLogTableFlag;
// }
//
// public static boolean isWriteToLogTableFlag() {
// return writeToLogTableFlag;
// }
//
// public static void setStartLogTableFlag(boolean startLogTableFlag) {
// Constants.startLogTableFlag = startLogTableFlag;
// }
//
// public static void setIngestionFlag(boolean ingestionFlag) {
// Constants.ingestionFlag = ingestionFlag;
// }
//
// public static void setRecordCountFlag(boolean recordCountFlag) {
// Constants.recordCountFlag = recordCountFlag;
// }
//
// public static void setStopLogTableFlag(boolean stopLogTableFlag) {
// Constants.stopLogTableFlag = stopLogTableFlag;
// }
//
// public static void setRuntimeLogTableFlag(boolean runtimeLogTableFlag) {
// Constants.runtimeLogTableFlag = runtimeLogTableFlag;
// }
//
// public static void setWriteToLogTableFlag(boolean writeToLogTableFlag) {
// Constants.writeToLogTableFlag = writeToLogTableFlag;
// }

// IB Tables getters
share|improve this question
    
You said something about NullPointerExceptions. Does the code work correctly, to the best of your knowledge? – 200_success Feb 10 at 18:35
    
yes the code works. – dataEnthusiast Feb 11 at 1:25

Your Answer

 
discard

By posting your answer, you agree to the privacy policy and terms of service.

Browse other questions tagged or ask your own question.