From 6bec568a37986156a15a9b955c38ebe42196b7e2 Mon Sep 17 00:00:00 2001 From: jitendra42 Date: Fri, 3 Feb 2017 11:05:59 -0800 Subject: [PATCH 1/2] adding s3/hdfs support for scoring engine --- .../scoring/ScoringEngineHelper.scala | 27 +++++++++++++++---- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/model-scoring-core/src/main/scala/org/trustedanalytics/scoring/ScoringEngineHelper.scala b/model-scoring-core/src/main/scala/org/trustedanalytics/scoring/ScoringEngineHelper.scala index ca8211d..25780e4 100644 --- a/model-scoring-core/src/main/scala/org/trustedanalytics/scoring/ScoringEngineHelper.scala +++ b/model-scoring-core/src/main/scala/org/trustedanalytics/scoring/ScoringEngineHelper.scala @@ -41,11 +41,28 @@ object ScoringEngineHelper { def getModel(marFilePath: String): Model = { if (marFilePath != "") { - logger.info("calling ModelArchiveFormat to get the model") - ModelArchiveFormat.read(new File(marFilePath), this.getClass.getClassLoader, None) - } - else { - null + try { + logger.info("Calling ModelArchiveFormat.read() to load the model stored locally") + return ModelArchiveFormat.read(new File(marFilePath), this.getClass.getClassLoader, None) + } catch { + case e: Exception => + try { + logger.info("Unale to load model from local filesystem, trying to load the model from HDFS") + tempMarFile = File.createTempFile("model", ".mar") + hdfsFileSystem.copyToLocalFile(false, new Path(marFilePath), new Path(tempMarFile.getAbsolutePath)) + val hdfsMarFilePath = tempMarFile.getAbsolutePath + sys.addShutdownHook(FileUtils.deleteQuietly(tempMarFile)) // Delete temporary directory on exit + return ModelArchiveFormat.read(new File(hdfsMarFilePath), this.getClass.getClassLoader, None) + } catch { + case e: Exception => + logger.info("Unale to load model from HDFS...") + return null + } finally { + FileUtils.deleteQuietly(tempMarFile) + } + } + } else { + return null } } } \ No newline at end of file From 1e667a0ca06afe97e03b0415dcf08fa80ae75880 Mon Sep 17 00:00:00 2001 From: jitendra42 Date: Fri, 3 Feb 2017 16:06:31 -0800 Subject: [PATCH 2/2] adding s3 support for scoring engine --- model-scoring-core/pom.xml | 5 +++ .../src/main/resources/reference.conf | 7 ++++ .../scoring/ScoringEngineHelper.scala | 34 ++++++++++++++++--- 3 files changed, 42 insertions(+), 4 deletions(-) diff --git a/model-scoring-core/pom.xml b/model-scoring-core/pom.xml index e7ec13d..f364bec 100644 --- a/model-scoring-core/pom.xml +++ b/model-scoring-core/pom.xml @@ -589,5 +589,10 @@ maven-artifact 2.2.1 + + org.apache.hadoop + hadoop-aws + ${tap.hadoop2.version} + \ No newline at end of file diff --git a/model-scoring-core/src/main/resources/reference.conf b/model-scoring-core/src/main/resources/reference.conf index c237fbb..36d1526 100644 --- a/model-scoring-core/src/main/resources/reference.conf +++ b/model-scoring-core/src/main/resources/reference.conf @@ -7,6 +7,13 @@ trustedanalytics.scoring-engine { archive-mar = "" } +#-Dtrustedanalytics.aws.fs.s3a.access.key="ACCESS_KEY" +trustedanalytics.aws { + fs.s3a.proxy.host = "" + fs.s3a.proxy.port = "" + fs.s3a.access.key = "" + fs.s3a.secret.key = "" +} #-Dtrustedanalytics.scoring.port="SOME_PORT" trustedanalytics.scoring { identifier = "ia" diff --git a/model-scoring-core/src/main/scala/org/trustedanalytics/scoring/ScoringEngineHelper.scala b/model-scoring-core/src/main/scala/org/trustedanalytics/scoring/ScoringEngineHelper.scala index 25780e4..ce377a7 100644 --- a/model-scoring-core/src/main/scala/org/trustedanalytics/scoring/ScoringEngineHelper.scala +++ b/model-scoring-core/src/main/scala/org/trustedanalytics/scoring/ScoringEngineHelper.scala @@ -17,14 +17,20 @@ package org.trustedanalytics.scoring import java.io.File +import java.net.URI import java.util.{ArrayList => JArrayList} +import com.typesafe.config.ConfigFactory +import org.apache.commons.io.FileUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path import org.slf4j.LoggerFactory import org.trustedanalytics.model.archive.format.ModelArchiveFormat import org.trustedanalytics.scoring.interfaces.Model object ScoringEngineHelper { private val logger = LoggerFactory.getLogger(this.getClass) + val config = ConfigFactory.load(this.getClass.getClassLoader) /** * @@ -39,6 +45,21 @@ object ScoringEngineHelper { model.output().deep == revisedModel.output().deep } + private def getAWSConfig(): Configuration = { + val proxyHost = config.getString("trustedanalytics.aws.fs.s3a.proxy.host") + val proxyPort = config.getString("trustedanalytics.aws.fs.s3a.proxy.port") + val accessKey = config.getString("trustedanalytics.aws.fs.s3a.access.key") + val secretKey = config.getString("trustedanalytics.aws.fs.s3a.secret.key") + + val cfg = new Configuration() + if(proxyHost != "" && proxyPort != "") { + cfg.set("fs.s3a.proxy.host",proxyHost) + cfg.set("fs.s3a.proxy.port",proxyPort) + } + if(accessKey != "") cfg.set("fs.s3a.access.key",accessKey) else throw new Exception("Configuration do not have AWS access key") + if(secretKey != "") cfg.set("fs.s3a.secret.key",secretKey) else throw new Exception("Configuration do not have AWS secret key") + cfg + } def getModel(marFilePath: String): Model = { if (marFilePath != "") { try { @@ -46,16 +67,21 @@ object ScoringEngineHelper { return ModelArchiveFormat.read(new File(marFilePath), this.getClass.getClassLoader, None) } catch { case e: Exception => - try { - logger.info("Unale to load model from local filesystem, trying to load the model from HDFS") - tempMarFile = File.createTempFile("model", ".mar") + logger.info("Unale to load model from local filesystem, trying to load the model from HDFS") + var tempMarFile: File = null + tempMarFile = File.createTempFile("model", ".mar") + try + { + val cfg = getAWSConfig() + val hdfsFileSystem = org.apache.hadoop.fs.FileSystem.get(new URI(marFilePath), cfg) hdfsFileSystem.copyToLocalFile(false, new Path(marFilePath), new Path(tempMarFile.getAbsolutePath)) val hdfsMarFilePath = tempMarFile.getAbsolutePath sys.addShutdownHook(FileUtils.deleteQuietly(tempMarFile)) // Delete temporary directory on exit return ModelArchiveFormat.read(new File(hdfsMarFilePath), this.getClass.getClassLoader, None) } catch { case e: Exception => - logger.info("Unale to load model from HDFS...") + logger.info("Unale to load model from HDFS...\n"+e.getMessage) + logger.info("\n"+e.getStackTraceString) return null } finally { FileUtils.deleteQuietly(tempMarFile)