суббота, 22 января 2022 г.

Удаленный запуск SAP DS job используя Python

В этой статье расскажу как встроить инструмент SAP DS в потоки Airflow.

Если у вас в компании много SAP, но не хватило денег на Hana Enterprise, то единственный способ копирования данных во вне - это использовать SAP DS через сервера приложений SAP.
Минусы этой схемы - это старый, неудобный инструмент и что запуски DS нельзя встроить в потоки Airflow.
От неудобного интерфейса и nocode никуда не деться, но вынесем хотя бы запуски job заданий SAP DS внутрь Airflow.



Настройка DS Job как web service

SAP DS можно использовать как web service провайдер
для этого нужно сделать нескольк манипуляций над вашим job:
1. Опубликовать как web сервис

2. Присвоить метку для сервиса (там же выбрать какие методы вам нужны)
3. Выгрузить WSDL сервиса по метке
После этого, используя WSDL можно сгенерировать java api код, но можно пойти другим путем, если мы хотим обращаться к сервису из Python
Для этого скачиваем утилиту SOAP UI, загружаем в нее WSDL и она генерирует нам все наборы методов и примеры для них.
Остается только выбрать нужные нам и обернуть их в post запросы внутри python.

Реализуем класс DSAPI, где опишем методы логина/логаута, запуска задания, просмотра его статуса, ошибки и трейса:


Логин и получение ID сессии

def logon(self, logon_xml):
В xml операции logon, дополнительно добавлена информация о адерсе сервера SAP DS.
Это обусловлено внутренним подходом в хранении настроек подключения (удобней сохранить целиком XML со всеми настройками):
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:ser="http://www.businessobjects.com/DataServices/ServerX.xsd">
    <soapenv:Header>
      <host>http://10.2.110.73:8080/DataServices/servlet/webservices?ver=2.1</host>
   </soapenv:Header>
   <soapenv:Body>
      <ser:LogonRequest>
         <username>hadoop</username>
         <password>***</password>
         <cms_system>of-sapdsdw-02</cms_system>
         <cms_authentication>secEnterprise</cms_authentication>
      </ser:LogonRequest>
   </soapenv:Body>
</soapenv:Envelope>

Запуск JOB

Полученный токен и host из logon добавляется ко всем API вызовам.
Для запуска job нужно его название и xml с параметрами
def run(self, host, session_id, job_name, job_xml):
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:ser="http://www.businessobjects.com/DataServices/ServerX.xsd">
   <soapenv:Header>
      <ser:session>
         <SessionID>$1</SessionID>
      </ser:session>
   </soapenv:Header>
   <soapenv:Body>
      <ser:JB_API_Job>
         <job_parameters>
            <job_system_profile>dev</job_system_profile>
            <job_server>JobServer_Win</job_server>
            <trace>Session</trace>
            <trace>Workflow</trace>
            <trace>Dataflow</trace>
         </job_parameters>
      </ser:JB_API_Job>
   </soapenv:Body>
</soapenv:Envelope>
Метод вернет ID процесса JOB и имя репозитория, где он находился.
Так же стоит заметить, что этот метод не дожидается полного окончания job, а только его запускает.

Мониторинг Jobs

несколько аналогиных по набору параметров методы:
Все они запрашивают ID процесса JOB и имя репозитория и возвращают информацию:
# получить статус задания
def getStatus(self, host, session_id, run_id, repo_name): 
#получить ошибку, если она была
def getError(self, host, session_id, run_id, repo_name):
#получить трейс работы задания
def getTrace(self, host, session_id, run_id, repo_name):

Запуск Job с ожиданием окончания

Чтобы получить финальный статус задания и дождаться его полного выполнения сделан расширяющий класс DSJob
Класс принимет параметры: xml подключения, имя job, xml параметров job и реализует метод:
def runWait(self, timeout_seconds = 3600, debug_msg = True):
Который запускает задание и дожидается его выполнения или выпадает по таймауту (timeout_seconds)
Возвращает же Exception с из метода getError, если задание завершилось с ошибкой или таймаутом.
Или полный трейс (getTrace), если задание завершилось без ошибок.

Полный пример запуска задания с названием JOB_API

dj = DSJob("""<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:ser="http://www.businessobjects.com/DataServices/ServerX.xsd">
    <soapenv:Header>
      <host>http://10.2.110.73:8080/DataServices/servlet/webservices?ver=2.1</host>
   </soapenv:Header>
   <soapenv:Body>
      <ser:LogonRequest>
         <username>hadoop</username>
         <password>***</password>
         <cms_system>of-sapdsdw-02</cms_system>
         <cms_authentication>secEnterprise</cms_authentication>
      </ser:LogonRequest>
   </soapenv:Body>
</soapenv:Envelope>""", 
"JB_API", 
"""<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:ser="http://www.businessobjects.com/DataServices/ServerX.xsd">
   <soapenv:Header>
      <ser:session>
         <SessionID>$1</SessionID>
      </ser:session>
   </soapenv:Header>
   <soapenv:Body>
      <ser:JB_API_Job>
         <job_parameters>
            <job_system_profile>dev</job_system_profile>
            <job_server>JobServer_Win</job_server>
            <trace>Session</trace>
            <trace>Workflow</trace>
            <trace>Dataflow</trace>
         </job_parameters>
      </ser:JB_API_Job>
   </soapenv:Body>
</soapenv:Envelope>""")

dj.runWait()

del dj 

Комментариев нет:

Отправить комментарий