From fdb99654442519f1816ed161fcc9f9765f657297 Mon Sep 17 00:00:00 2001 From: Thiago Crepaldi Date: Sun, 14 Mar 2021 21:58:52 -0700 Subject: [PATCH] Add basic certificate APIs * List all installed certificate names * List all Self-signed certificate names * List all Let's Encrypt certificate names * List all available Services that can be associated to Certificates * Assign a Service to an existing Certificate --- src/synology_dsm/api/core/certificate.py | 120 +++++++++++++++++++++++ src/synology_dsm/synology_dsm.py | 27 ++++- 2 files changed, 144 insertions(+), 3 deletions(-) create mode 100644 src/synology_dsm/api/core/certificate.py diff --git a/src/synology_dsm/api/core/certificate.py b/src/synology_dsm/api/core/certificate.py new file mode 100644 index 00000000..a8f6b6ce --- /dev/null +++ b/src/synology_dsm/api/core/certificate.py @@ -0,0 +1,120 @@ +"""DSM Certificate data.""" + +import json + + +class SynoCoreCertificate: + """Class containing Certificate data.""" + + API_CERTIFICATE_KEY = "SYNO.Core.Certificate.CRT" + API_CERTIFICATE_SERVICE_KEY = "SYNO.Core.Certificate.Service" # method='set' + + def __init__(self, dsm): + """Constructor method.""" + self._dsm = dsm + self._data = {} + + def update(self): + """Updates certificate data.""" + raw_data = self._dsm.get(self.API_CERTIFICATE_KEY, "list") + if raw_data: + self._data = raw_data["data"] + + @property + def success(self): + """Gets the last scan success.""" + return self._data.get("success") + + @property + def all(self): + """Gets all certificate names.""" + cert_list = self._data.get("certificates") + return [cert['desc'] for cert in cert_list] + + @property + def self_signed(self): + """Gets only self-signed certificate names.""" + cert_list = self._data.get("certificates") + return [cert['desc'] for cert in cert_list if cert['issuer']['organization'] == 'Synology Inc.'] + + @property + def lets_encrypt(self): + """Gets only Let's Encrypt certificate names.""" + cert_list = self._data.get("certificates") + return [cert['desc'] for cert in cert_list if cert['issuer']['organization'] == "Let's Encrypt"] + + def _services_by_certificate(self, cert_name): + """Gets services associated with certificate `cert_name`""" + cert_list = self._data.get("certificates") + cert_list = [cert for cert in cert_list if cert['desc'] == cert_name] + if not cert_list: + return [] + cert = cert_list[0] + services = [serv for serv in cert['services']] + return services + + def services_by_certificate(self, cert_name): + services = [svc['service'] + for svc in self._services_by_certificate(cert_name)] + return services or [] + + def _services(self): + """Gets all services that can be associated with certificates""" + services = [] + for cert in self.all: + services += self._services_by_certificate(cert) + return services + + def services(self): + services = [svc['service'] for svc in self._services()] + return services or [] + + def _get_cert_id_by_cert_name(self, cert_name): + cert_id = [cert['id'] for cert in self._data.get( + "certificates") if cert['desc'] == cert_name] + if len(cert_id) != 1: + raise RuntimeError('Certificate not found') + return cert_id[0] + + def _get_cert_id_by_service_name(self, service_name): + for cert in self._data.get('certificates'): + for svc in cert['services']: + if svc['service'] == service_name: + return cert['id'] + raise RuntimeError( + 'No certificate was found associated to the specified service') + + def assign_certificate_to_service(self, cert_name, service_names): + """Sets certificate `cert_name` to all `service_name` services""" + + # Verify specified cert_name is valid + if cert_name is None or cert_name not in self.all: + raise RuntimeError('Invalid certificate name!') + + # Verify specified services are valid + all_services = self.services() + valid_services = [svc for svc in service_names if svc in all_services] + if not valid_services: + raise RuntimeError('Invalid service(s) name(s)!') + + # Prepare payload for the REST API + new_cert_id = self._get_cert_id_by_cert_name(cert_name) + services = self._services() + services = [{'service': svc, 'old_id': self._get_cert_id_by_service_name(svc['service']), 'id': new_cert_id, } + for svc in services if svc['service'] in valid_services] + params = {'settings': json.dumps(services)} + + # Final validation + for svc in services: + if not svc['service'] or not svc['old_id'] or not svc['id']: + raise RuntimeError('Malformed service configuration!') + if svc['old_id'] == svc['id']: + raise RuntimeError('Service {} is already using the specified certificate'. format( + svc['service']['display_name'])) + + # REST API call + res = self._dsm.post(api=self.API_CERTIFICATE_SERVICE_KEY, + method="set", + params=params) + + return res diff --git a/src/synology_dsm/synology_dsm.py b/src/synology_dsm/synology_dsm.py index 6b6a09eb..89ab2214 100644 --- a/src/synology_dsm/synology_dsm.py +++ b/src/synology_dsm/synology_dsm.py @@ -8,6 +8,7 @@ from requests.exceptions import RequestException from .api.core.security import SynoCoreSecurity +from .api.core.certificate import SynoCoreCertificate from .api.core.share import SynoCoreShare from .api.core.system import SynoCoreSystem from .api.core.upgrade import SynoCoreUpgrade @@ -73,6 +74,7 @@ def __init__( self._information = None self._network = None self._security = None + self._certificate = None self._share = None self._storage = None self._surveillance = None @@ -165,7 +167,8 @@ def login(self, otp_code: str = None) -> bool: } raise switcher.get( result["error"]["code"], - SynologyDSMLoginFailedException(result["error"]["code"], self.username), + SynologyDSMLoginFailedException( + result["error"]["code"], self.username), ) # Parse result if valid @@ -177,7 +180,8 @@ def login(self, otp_code: str = None) -> bool: # Not available on API version < 6 && device token is given once # per device_name self._device_token = result["data"]["did"] - self._debuglog("Authentication successful, token: " + str(self._session_id)) + self._debuglog("Authentication successful, token: " + + str(self._session_id)) if not self._information: self._information = SynoDSMInformation(self) @@ -303,7 +307,8 @@ def _execute_request(self, method: str, url: str, params: dict, **kwargs): if response.status_code == 200: # We got a DSM response - content_type = response.headers.get("Content-Type", "").split(";")[0] + content_type = response.headers.get( + "Content-Type", "").split(";")[0] if content_type in [ "application/json", @@ -334,6 +339,9 @@ def update(self, with_information: bool = False, with_network: bool = False): if self._security: self._security.update() + if self._certificate: + self._certificate.update() + if self._utilisation: self._utilisation.update() @@ -363,6 +371,9 @@ def reset(self, api: any) -> bool: if api == SynoCoreSecurity.API_KEY: self._security = None return True + if api == SynoCoreCertificate.API_KEY: + self._certificate = None + return True if api == SynoCoreShare.API_KEY: self._share = None return True @@ -387,6 +398,9 @@ def reset(self, api: any) -> bool: if isinstance(api, SynoCoreSecurity): self._security = None return True + if isinstance(api, SynoCoreCertificate): + self._certificate = None + return True if isinstance(api, SynoCoreShare): self._share = None return True @@ -438,6 +452,13 @@ def security(self) -> SynoCoreSecurity: self._security = SynoCoreSecurity(self) return self._security + @property + def certificate(self) -> SynoCoreCertificate: + """Gets NAS Certificate informations.""" + if not self._certificate: + self._certificate = SynoCoreCertificate(self) + return self._certificate + @property def share(self) -> SynoCoreShare: """Gets NAS shares information."""