Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added regression-tests/datasets/Kmeans.mar
Binary file not shown.
Binary file added regression-tests/datasets/kmeans_test.mar
Binary file not shown.
Empty file.
Binary file added regression-tests/lib/__init__.pyc
Binary file not shown.
37 changes: 37 additions & 0 deletions regression-tests/lib/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# vim: set encoding=utf-8

# Copyright (c) 2016 Intel Corporation 
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

""" Global Config file for testcases, used heavily by automation"""
import os


root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
dataset_directory = os.path.join(root, "regression-tests", "datasets")
hdfs_namenode = os.getenv("CDH_MASTER", "localhost")
user = os.getenv("USER", "hadoop")
run_mode = True if os.getenv("RUN_MODE", "yarn_client") == "yarn_client" else False
hostname = os.getenv("HOSTNAME")

# HDFS paths, need to be set NOT using os.join since HDFS doesn't use the system
# path seperator, it uses HDFS path seperator ('/')
hdfs_user_root = "/user/" + user
hdfs_data_dir = hdfs_user_root + "/qa_data"
checkpoint_dir = hdfs_user_root + "/sparktk_checkpoint"
export_dir = "hdfs://"+hostname+":8020"+hdfs_user_root+"/sparktk_export"

scoring_engine_host = os.getenv("SCORING_ENGINE_HOST", "127.0.0.1")
port = 8020
Binary file added regression-tests/lib/config.pyc
Binary file not shown.
95 changes: 95 additions & 0 deletions regression-tests/lib/scoring_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# vim: set encoding=utf-8

# Copyright (c) 2016 Intel Corporation 
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

""" Library to support scoring in TAP for the ATK service """
import subprocess as sp
import requests
import time
import signal
import os
import config


class scorer(object):

def __init__(self, hdfs_path=None, port=config.port, pipeline=False, host=config.scoring_engine_host):
"""Set up the server location, port and model file"""
self.name = host.split('.')[0]
self.host = host
self.pipeline = pipeline
self.port = port
self.scoring_process = None
self.hdfs_path = hdfs_path
self.full_host_url = "http://" + str(self.host) + ":" + str(self.port)

def __enter__(self):
"""Activate the Server"""
# change current working directory to point at scoring_engine dir
run_path = os.path.abspath(os.path.join(config.root, "model-scoring-core"))

# keep track of cwd for future
test_dir = os.getcwd()
os.chdir(run_path)

# make a new process group
if self.hdfs_path:
self.scoring_process = sp.Popen(["./bin/scoring-server.sh",
"-Dtrustedanalytics.scoring-engine.archive-mar=%s" % self.hdfs_path,
"-Dtrustedanalytics.scoring.host=%s" % self.host,
"-Dtrustedanalytics.scoring.port=%s" % self.port], preexec_fn=os.setsid)
else:
self.scoring_process = sp.Popen(["./bin/scoring-server.sh",
"-Dtrustedanalytics.scoring.host=%s" % self.host,
"-Dtrustedanalytics.scoring.port=%s" % self.port], preexec_fn=os.setsid)

# restore cwd
os.chdir(test_dir)

# wait for server to start
time.sleep(20)

return self

def __exit__(self, *args):
"""Teardown the server"""
# Get the process group to kill all of the suprocesses
pgrp = os.getpgid(self.scoring_process.pid)
os.killpg(pgrp, signal.SIGKILL)
time.sleep(50)

def upload_mar_bytes(self, file_bytes):
"""gives mar file to empty scoring server as bytes data"""
requests.post(url=self.full_host_url + "/uploadMarBytes",
data=file_bytes,
headers={"Content-type": "application/octet-stream"})

def upload_mar_file(self, files):
"""gives a mar file to a an empty scoring server"""
requests.post(url=self.full_host_url + "/uploadMarFile", files=files)

def score(self, data_val):
"""score the json set data_val"""

# Magic headers to make the server respond appropriately
# Ask the head of scoring why these
headers = {'Content-type': 'application/json',
'Accept': 'application/json,text/plain'}

response = requests.post(
self.full_host_url + "/v2/score", json={"records": data_val}, headers=headers)

return response
Binary file added regression-tests/lib/scoring_utils.pyc
Binary file not shown.
109 changes: 109 additions & 0 deletions regression-tests/lib/sparktk_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# vim: set encoding=utf-8

# Copyright (c) 2016 Intel Corporation 
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Setup up tests for regression """

import unittest
import uuid
import datetime
import os
import daaltk

import sparktk as stk

import config
from threading import Lock
udf_lib_path = os.path.join(config.root, "regression-tests", "sparktkregtests", "lib" ,"udftestlib")
udf_files = [os.path.join(udf_lib_path, f) for f in os.listdir(udf_lib_path)]

lock = Lock()
global_tc = None


def get_context():
global global_tc
with lock:
if global_tc is None:
sparktkconf_dict = {'spark.driver.maxPermSize': '512m',
'spark.ui.enabled': 'false',
'spark.driver.maxResultSize': '2g',
'spark.dynamicAllocation.enabled': 'true',
'spark.dynamicAllocation.maxExecutors': '16',
'spark.dynamicAllocation.minExecutors': '1',
'spark.executor.cores': '2',
'spark.executor.memory': '2g',
'spark.shuffle.io.preferDirectBufs': 'true',
'spark.shuffle.service.enabled': 'true',
'spark.yarn.am.waitTime': '1000000',
'spark.yarn.executor.memoryOverhead': '384',
'spark.eventLog.enabled': 'false',
'spark.sql.shuffle.partitions': '6'}
if config.run_mode:
global_tc = stk.TkContext(master='yarn-client', extra_conf_dict=sparktkconf_dict, py_files=udf_files)
else:
global_tc = stk.TkContext(py_files=udf_files)

return global_tc


class SparkTKTestCase(unittest.TestCase):

@classmethod
def setUpClass(cls):
"""Build the context for use"""
cls.context = get_context()
cls.context.sc.setCheckpointDir(config.checkpoint_dir)

def setUp(self):
pass

def tearDown(self):
pass

@classmethod
def tearDownClass(cls):
pass

def get_file(self, filename):
"""Return the hdfs path to the given file"""
# Note this is an HDFS path, not a userspace path. os.path library
# may be wrong
placed_path = config.hdfs_data_dir + "/" + filename
return placed_path

def get_export_file(self, filename):
# Note this is an HDFS path, not a userspace path. os.path library
# may be wrong
placed_path = config.export_dir + "/" + filename
return placed_path

def get_name(self, prefix):
"""build a guid hardened unique name """
datestamp = datetime.datetime.now().strftime("%m_%d_%H_%M_")
name = prefix + datestamp + uuid.uuid1().hex
return name

def get_local_dataset(self, dataset):
"""gets the dataset from the dataset folder"""
dataset_directory = config.dataset_directory
return os.path.join(dataset_directory, dataset)

def assertFramesEqual(self, frame1, frame2):
frame1_take = frame1.take(frame1.count())
frame2_take = frame2.take(frame2.count())

self.assertItemsEqual(frame1_take, frame2_take)
Binary file not shown.
118 changes: 118 additions & 0 deletions regression-tests/testcases/python_scoring_engine_endpoint_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
# vim: set encoding=utf-8

# Copyright (c) 2016 Intel Corporation 
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

""" test cases for scala scoring engine """
import unittest
import os
from lib import scoring_utils
from sparktkregtests.lib import sparktk_test


class ScalaScoring(sparktk_test.SparkTKTestCase):

def setUp(self):
"""Import the files to test against."""
super(ScalaScoring, self).setUp()
self.schema = [("data", float),
("name", str)]
self.train_data = [[2, "ab"], [1, "cd"], [7, "ef"],
[1, "gh"], [9, "ij"], [2, "jk"],
[0, "mn"], [6, "op"], [5, "qr"]]
self.test_data = [[0, "ab"], [1, "cd"], [4, "ef"],
[3, "gh"], [4, "ij"], [5, "jk"],
[10, "mn"], [10, "op"], [2, "qr"]]
self.frame_train = self.context.frame.create(self.train_data, schema=self.schema)
self.frame_test = self.context.frame.create(self.test_data, schema=self.schema)
reg_test_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
self.mar_path = os.path.join(reg_test_root, "datasets") + "/Kmeans.mar"
self.model = self.context.models.clustering.kmeans.train(self.frame_train, ["data"], 3, seed=5)

def test_model_scoring_upload_mar_file_simple(self):
"""test creating an empty scoring server then upload mar file and score"""
expected_res = self.model.predict(self.frame_test)

files = {"file": open(self.mar_path, 'rb')}

with scoring_utils.scorer() as scorer:
scorer.upload_mar_file(files)
self._score_and_compare_expected_actual_result(expected_res, scorer)

def test_model_scoring_simple(self):
"""simple model scoring test"""
expected_result = self.model.predict(self.frame_test)

with scoring_utils.scorer(self.mar_path) as scorer:
self._score_and_compare_expected_actual_result(expected_result, scorer)

def test_model_scoring_send_model_bytes(self):
"""start empty scoring server and test sending model as bytes"""
expected_res = self.model.predict(self.frame_test)

with open(self.mar_path, 'rb') as mar_file:
bytes_data = mar_file.read()

with scoring_utils.scorer() as scorer:
scorer.upload_mar_bytes(bytes_data)
self._score_and_compare_expected_actual_result(expected_res, scorer)

@unittest.skip("scoring_engine: sending model twice to scoring engine should provide nice error message")
def test_send_model_twice(self):
"""test to ensure that sending two models fails"""
with scoring_utils.scorer(self.mar_path) as scorer:
with scorer.upload_mar_file(self.mar_path):
response = scorer.score({"data": 2})
self.assertTrue("500" in str(response))

def test_score_no_model(self):
"""test scoring on a score server started without a model"""
with scoring_utils.scorer() as scorer:
response = scorer.score({"data": 2})
self.assertTrue("500" in str(response))

def test_score_invalid_data(self):
"""start a scoring server with valid mar but send invalid data"""
with scoring_utils.scorer(self.mar_path) as scorer:
response = scorer.score({"data": "apple"})
self.assertTrue("500" in str(response))

def _score_and_compare_expected_actual_result(self, expected, scorer):
"""compare predict and score result"""
# get pandas frame for ease of access from exp res
pandas_res = expected.to_pandas()
# here we will store the equivalent cluster name
# this is because the cluster names may be labeled differently
# e.g., predict may call one cluster 0, scoring engine might label it 1
# so we will record what the equivalent cluster is
# we only care to ensure that the groups are the same, the labels can differ
map_cluster_labels = {}

# iterate through the pandas predict result
for (index, row) in pandas_res.iterrows():
score_result = scorer.score([dict(zip(["data"], [row["data"]]))])
score = score_result.json()["data"][0]["score"]

# if we have not yet seen this cluster label we add it to our dict
# of we have already seen this cluster label then we ensure that the
# mapped cluster is the same
if row["cluster"] not in map_cluster_labels:
map_cluster_labels[row["cluster"]] = score
else:
self.assertEqual(map_cluster_labels[row["cluster"]], score)


if __name__ == '__main__':
unittest.main()