From 1c6849de628050475447bfe6e2c93986d37404ec Mon Sep 17 00:00:00 2001 From: Nikos Karastathis Date: Fri, 11 May 2018 09:42:33 +0200 Subject: [PATCH] Add the beta, xing, lumi level and emittance scan classes --- LHC_AutoScans.py | 65 ++++++++++++++++++++++++++++++++++++++ LHC_BetaStar.py | 65 ++++++++++++++++++++++++++++++++++++++ LHC_Crossing.py | 61 +++++++++++++++++++++++++++++++++++ LHC_LumiLevelling.py | 61 +++++++++++++++++++++++++++++++++++ lhc_log_db_query_repeat.py | 44 ++++++++++++++++++++++++++ 5 files changed, 296 insertions(+) create mode 100644 LHC_AutoScans.py create mode 100644 LHC_BetaStar.py create mode 100644 LHC_Crossing.py create mode 100644 LHC_LumiLevelling.py create mode 100644 lhc_log_db_query_repeat.py diff --git a/LHC_AutoScans.py b/LHC_AutoScans.py new file mode 100644 index 0000000..6bc3c79 --- /dev/null +++ b/LHC_AutoScans.py @@ -0,0 +1,65 @@ +import numpy as np +import TimberManager as tm + +class AutoScan: + def __init__(self, timber_variable): + + if type(timber_variable) is str: + print 'type var' + # if not (IP == 1 or IP == 5): + # raise ValueError('You need to specify which IP! (1 or 5)') + dict_timber = tm.parse_timber_file(timber_variable, verbose=True) + timber_variable_SCAN = dict_timber[get_variable_dict['AUTOSCAN']] + # if IP == 1: + # timber_variable_ENABLE = dict_timber[get_variable_dict['LUMILEVELING.IP1:ENABLE']] + # elif IP == 5: + # timber_variable_ENABLE = dict_timber[get_variable_dict['LUMILEVELING.IP5:ENABLE']] + + elif hasattr(timber_variable, '__getitem__'): + print 'type dic' + try: + timber_variable_SCAN = timber_variable['AUTOMATICSCAN:IP'] #get_variable_dict['AUTOSCAN']] + # if IP == 1: + # timber_variable_ENABLE = timber_variable['LUMILEVELING.IP1:ENABLE'] + # elif IP == 5: + # timber_variable_ENABLE = timber_variable['LUMILEVELING.IP5:ENABLE'] + except: + print '# LHC_AutoScans : No Emittance Scan Information! Returning -1' + return -1 + + self.t_stamps = timber_variable_SCAN.t_stamps + self.autoscanIP = timber_variable_SCAN.values + + + self.autoscanIP = np.array(np.float_(self.autoscanIP)).ravel() + self.t_stamps = np.array(np.float_(self.t_stamps)) + + def nearest_older_sample(self, t_obs, flag_return_time=False): + ind_min = np.argmin(np.abs(self.t_stamps - t_obs)) + if self.t_stamps[ind_min] > t_obs: + ind_min -= 1 + if flag_return_time: + if ind_min == -1: + return 0.*self.autoscanIP[ind_min], -1 + else: + return self.autoscanIP[ind_min], self.t_stamps[ind_min] + else: + if ind_min == -1: + return 0.*self.autoscanIP[ind_min] + else: + return self.autoscanIP[ind_min] + + + +def get_variable_dict(): + var_dict = {} + var_dict['AUTOSCAN'] = 'AUTOMATICSCAN:IP' + #var_dict['LUMI_LEVEL_ENABLE_IP5'] = 'LUMILEVELING.IP5:ENABLE' + + return var_dict + +def variable_list(): + var_list = [] + var_list += get_variable_dict().values() + + return var_list diff --git a/LHC_BetaStar.py b/LHC_BetaStar.py new file mode 100644 index 0000000..855fa1e --- /dev/null +++ b/LHC_BetaStar.py @@ -0,0 +1,65 @@ +import numpy as np +import TimberManager as tm + +class BetaStar: + def __init__(self, timber_variable, IP=0): + + timber_variable_BETA = None + if type(timber_variable) is str: + print 'type var' + if not (IP ==1 or IP==5): + raise ValueError('You need to specify with IP! (1 or 5)') + dict_timber = tm.parse_timber_file(timber_variable, verbose=True) + if IP == 1: + timber_variable_BETA = dict_timber[get_variable_dict['BETASTAR_IP1']] + elif IP == 5: + timber_variable_BETA = dict_timber[get_variable_dict['BETASTAR_IP5']] + + elif hasattr(timber_variable, '__getitem__'): + + try: + if IP == 1: + timber_variable_BETA = timber_variable['HX:BETASTAR_IP1'] + elif IP == 5: + timber_variable_BETA = timber_variable['HX:BETASTAR_IP5'] + except: + print '# BetaStar : No BetaStar Information! Returning -1' + return -1 + + self.t_stamps = timber_variable_BETA.t_stamps + self.beta = timber_variable_BETA.values + + + self.beta = np.array(np.float_(self.beta)).ravel() + self.t_stamps = np.array(np.float_(self.t_stamps)) + + def nearest_older_sample(self, t_obs, flag_return_time=False): + ind_min = np.argmin(np.abs(self.t_stamps - t_obs)) + if self.t_stamps[ind_min] > t_obs: + ind_min -= 1 + if flag_return_time: + if ind_min == -1: + return 0.*self.beta[ind_min], -1 + else: + return self.beta[ind_min], self.t_stamps[ind_min] + else: + if ind_min == -1: + return 0.*self.beta[ind_min] + else: + return self.beta[ind_min] + + + +def get_variable_dict(): + var_dict = {} + var_dict['BETASTAR_IP1'] = 'HX:BETASTAR_IP1' + var_dict['BETASTAR_IP5'] = 'HX:BETASTAR_IP5' + #var_dict['LUMI_LEVEL_ENABLE_IP5'] = 'LUMILEVELING.IP5:ENABLE' + + return var_dict + +def variable_list(): + var_list = [] + var_list += get_variable_dict().values() + + return var_list diff --git a/LHC_Crossing.py b/LHC_Crossing.py new file mode 100644 index 0000000..a6d8f4a --- /dev/null +++ b/LHC_Crossing.py @@ -0,0 +1,61 @@ +import numpy as np +import TimberManager as tm + +class Crossing: + def __init__(self, timber_variable, IP=0): + + if type(timber_variable) is str: + if not (IP == 1 or IP == 5): + raise ValueError('You need to specify which IP! (1 or 5)') + dict_timber = tm.parse_timber_file(timber_variable, verbose=True) + if IP == 1: + timber_variable_XING = dict_timber[get_variable_dict['LHC.RUNCONFIG:IP1-XING-V-MURAD']] + elif IP == 5: + timber_variable_XING = dict_timber[get_variable_dict['LHC.RUNCONFIG:IP5-XING-H-MURAD']] + + elif hasattr(timber_variable, '__getitem__'): + try: + if IP == 1: + timber_variable_XING = timber_variable['LHC.RUNCONFIG:IP1-XING-V-MURAD'] + elif IP == 5: + timber_variable_XING = timber_variable['LHC.RUNCONFIG:IP5-XING-H-MURAD'] + except: + print '# LHC_Crossing : No Crossing Angle Information! Returning -1' + return -1 + + self.t_stamps = timber_variable_XING.t_stamps + self.xing = timber_variable_XING.values + + + self.xing = np.array(np.float_(self.xing)).ravel() + self.t_stamps = np.array(np.float_(self.t_stamps)) + + def nearest_older_sample(self, t_obs, flag_return_time=False): + ind_min = np.argmin(np.abs(self.t_stamps - t_obs)) + if self.t_stamps[ind_min] > t_obs: + ind_min -= 1 + if flag_return_time: + if ind_min == -1: + return 0.*self.xing[ind_min], -1 + else: + return self.xing[ind_min], self.t_stamps[ind_min] + else: + if ind_min == -1: + return 0.*self.xing[ind_min] + else: + return self.xing[ind_min] + + + +def get_variable_dict(): + var_dict = {} + var_dict['CROSSING_ANGLE_IP1'] = 'LHC.RUNCONFIG:IP1-XING-V-MURAD' + var_dict['CROSSING_ANGLE_IP5'] = 'LHC.RUNCONFIG:IP5-XING-H-MURAD' + + return var_dict + +def variable_list(): + var_list = [] + var_list += get_variable_dict().values() + + return var_list diff --git a/LHC_LumiLevelling.py b/LHC_LumiLevelling.py new file mode 100644 index 0000000..aefc0d4 --- /dev/null +++ b/LHC_LumiLevelling.py @@ -0,0 +1,61 @@ +import numpy as np +import TimberManager as tm + +class LumiLevelling: + def __init__(self, timber_variable, IP=0): + + if type(timber_variable) is str: + if not (IP == 1 or IP == 5): + raise ValueError('You need to specify which IP! (1 or 5)') + dict_timber = tm.parse_timber_file(timber_variable, verbose=True) + if IP == 1: + timber_variable_ENABLE = dict_timber[get_variable_dict['LUMI_LEVEL_ENABLE_IP1']] + elif IP == 5: + timber_variable_ENABLE = dict_timber[get_variable_dict['LUMI_LEVEL_ENABLE_IP5']] + + elif hasattr(timber_variable, '__getitem__'): + try: + if IP == 1: + timber_variable_ENABLE = timber_variable['LUMILEVELING.IP1:ENABLE'] + elif IP == 5: + timber_variable_ENABLE = timber_variable['LUMILEVELING.IP5:ENABLE'] + except: + print '# LHC_LumiLevelling : No Lumi Levelling Information! Returning -1' + return -1 + + self.t_stamps = timber_variable_ENABLE.t_stamps + self.enableLevelling = timber_variable_ENABLE.values + + + self.enableLevelling = np.array(np.float_(self.enableLevelling)).ravel() + self.t_stamps = np.array(np.float_(self.t_stamps)) + + def nearest_older_sample(self, t_obs, flag_return_time=False): + ind_min = np.argmin(np.abs(self.t_stamps - t_obs)) + if self.t_stamps[ind_min] > t_obs: + ind_min -= 1 + if flag_return_time: + if ind_min == -1: + return 0.*self.enableLevelling[ind_min], -1 + else: + return self.enableLevelling[ind_min], self.t_stamps[ind_min] + else: + if ind_min == -1: + return 0.*self.enableLevelling[ind_min] + else: + return self.enableLevelling[ind_min] + + + +def get_variable_dict(): + var_dict = {} + var_dict['LUMI_LEVEL_ENABLE_IP1'] = 'LUMILEVELING.IP1:ENABLE' + var_dict['LUMI_LEVEL_ENABLE_IP5'] = 'LUMILEVELING.IP5:ENABLE' + + return var_dict + +def variable_list(): + var_list = [] + var_list += get_variable_dict().values() + + return var_list diff --git a/lhc_log_db_query_repeat.py b/lhc_log_db_query_repeat.py new file mode 100644 index 0000000..8c6abba --- /dev/null +++ b/lhc_log_db_query_repeat.py @@ -0,0 +1,44 @@ +import os +import time + +def UnixTimeStamp2UTCTimberTimeString(t): + return time.strftime('%Y-%m-%d %H:%M:%S.000', time.gmtime(t)) + +def dbquery(varlist, t_start, t_stop, filename): + + if type(t_start) is not str: + t_start_str_UTC = UnixTimeStamp2UTCTimberTimeString(t_start) + else: + t_start_str_UTC = t_start + + if type(t_stop) is not str: + t_stop_str_UTC = UnixTimeStamp2UTCTimberTimeString(t_stop) + else: + t_stop_str_UTC = t_stop + + if type(varlist) is not list: + raise TypeError + + varlist_str = '' + for var in varlist: + varlist_str += var+',' + varlist_str = varlist_str[:-1] + + currpath = os.path.dirname(os.path.abspath(__file__)) + + execut = 'java -jar %s/accsoft-cals-extr-client-nodep.jar '%(currpath) + config = ' -C %s/ldb_UTC.conf '%(currpath) + time_interval = ' -t1 "'+ t_start_str_UTC +'" -t2 "'+t_stop_str_UTC+'"' + variables = '-vs "%s"'%(varlist_str) + repeat = ' -sa REPEAT -ss 1 -si MINUTE' + + if filename[0] == "/": + outpfile = ' -N '+filename + else: + outpfile = ' -N .//'+filename + + command = execut+config+variables+time_interval+repeat+outpfile + + print command + + os.system(command)