Перейти к содержанию

Модуль vmResource

Модуль позволяет работать с соединением с базой данных внутри блока Python.

Импорт модуля

from calculator.system import vmResource

Доступные методы

execute

Позволяет выполнить запрос используя открытое соединение выбранной базы данных.

execute(id_connection, *args, **kwargs)

Поддерживает: ClickHouse, MsSQL, Oracle, Postgres
Не поддерживает: Kafka, Mongo, SAP, SOAP

Параметры:

  • id_connection (Строка) - id открытого соединения;
  • sql_text (Строка) - запрос на получение данных;
  • sql_params (Список или Справочник) - параметры запроса;
  • Дополнительные флаги:
    • dict_return - возвращение тела запроса в виде словаря (True|False);
      • только для MsSQL, Oracle, Postgres;
    • show_description - вывод описание полей (True|False);
    • one_row_return - вывод только одной строки запроса (True|False);
    • null_return_value - значение, возвращаемое при пустом запросе;
    • final_commit - коммит после завершения запроса (True|False);

Флаг final_commit временно не работает. Выполнение транзакций с его помощью невозможно.

Пример
from calculator.system import vmResource

id_connect =  execution_context.inputs[0].value
sql = 'SELECT * FROM t_table WHERE name = %(name)s and model_name = %(model_name)s'
keys = {"name": "Пример", "model_name": "Модель"}

data = vmResource.execute(id_connect, sql, keys, final_commit=True)

package_insert

Позволяет выполнить пакетное сохранение данных, передаваемых в виде массива значений. Используется открытое соединение выбранной базы данных.

package_insert(id_connection, *args, **kwargs)

Поддерживает: MsSQL, Oracle, Postgres
Не поддерживает: ClickHouse, Kafka, Mongo, SAP, SOAP

Параметры:

  • id_connection (Строка) - id открытого соединения
  • sql_text (Строка) - запрос на вставку данных
  • sql_params (Список) - данные
  • дополнительные флаги
    • template - форма, по которым будут вставляться данные в бд
    • final_commit - коммит после завершения запроса (True|False)

Флаг final_commit временно не работает. Выполнение транзакций с его помощью невозможно.

Пример
from calculator.system import vmResource

id_connect =  execution_context.inputs[0].value

sql = """
    INSERT INTO t_table (
        column1, column2, vl
    ) 
    VALUES %s
"""

rows = [['Название 1', 'Атрибут 1', 10], ['Название 1', 'Атрибут 2', 100], ['Название 2', 'Атрибут 1', 1]]

vmResource.package_insert(id_connect, sql_text=sql, sql_params=rows, final_commit=True)
Пример вставки, с данными в виде списка
from calculator.system import vmResource

id_connect =  execution_context.inputs[0].value

sql = """
    INSERT INTO t_table (
        column1, column2, vl
    ) 
    VALUES %s
"""

# формат данных, по которым будут вставляться данные в бд
column_formats = '(%s::varchar, %s, %s::int)'

rows = [['Название 1', 'Атрибут 1', 10], ['Название 1', 'Атрибут 2', 100], ['Название 2', 'Атрибут 1', 1]]

vmResource.package_insert(id_connect, sql_text=sql, sql_params=rows, template=column_formats, final_commit=True)
Пример вставки, с данными в виде справочника
from calculator.system import vmResource

id_connect =  execution_context.inputs[0].value

sql = """
    INSERT INTO t_table (
        column1, column2, vl
    ) 
    VALUES %s
"""

# формат данных, по которым будут вставляться данные в бд
column_formats = "(%(CITY_ID)s, NULLIF(%(TYPE_NAME)s, '')::varchar, %(TYPE_ID)s::int)"

rows = [{"CITY_ID": "Название 1", "TYPE_NAME": "Атрибут 1", "TYPE_ID": 10}, {"CITY_ID": "Название 1", "TYPE_NAME": "Атрибут 2", "TYPE_ID": 100}, {"CITY_ID": "Название 2", "TYPE_NAME": "Атрибут 1", "TYPE_ID": 1}]

vmResource.package_insert(id_connect, sql_text=sql, sql_params=rows, template=column_formats, final_commit=True)

Работа с Clickhouse

get_clickhouse

Получает класс для работы с ClickHouse

get_clickhouse(id_connection)

execute

Выполнение запроса к Clickhouse.

execute(query, **kwargs) 

Пример вызова метода
from calculator.system import vmResource
conn_id = execution_context.inputs[0].value
click_house = vmResource.get_clickhouse(conn_id)

sql = 'select * from default.ora_vw_aff_channels'
res = click_house.execute(query=sql)

execution_context.log.info(res)

execute_iter

Создание итератора для получения данных

execute_iter(query, **kwargs) 

Пример вызова метода
from calculator.system import vmResource
conn_id = execution_context.inputs[0].value
click_house = vmResource.get_clickhouse(conn_id)

sql = 'select * from default.ora_vw_aff_channels'
settings = {'max_block_size': 100}
click_house.execute_iter(query=sql, settings=settings)

for row in click_house:
    execution_context.log.info(row)

execute_with_progress

Запуск сложного запроса, с возможностью его остановить.

execute_with_progress(query, **kwargs)

  • query - текст запроса;
  • Дополнительные параметры можно посмотреть в документации ClickHouse;
  • После того как запрос выполнился надо вызвать get_result().
Пример вызова метода
from calculator.system import vmResource
conn_id = execution_context.inputs[0].value
click_house = vmResource.get_clickhouse(conn_id)

sql = 'select * from default.ora_vw_aff_channels'
click_house.execute_with_progress(query=sql)
for num_rows, total_rows in click_house:
    execution_context.log.info(num_rows, total_rows)

execution_context.log.info(click_house.get_result())

query_dataframe

Выполнение запроса, а результат в виде DataFrame.

query_dataframe(query, **kwargs)

Работает, если в блоке соединения с ClickHouse в дополнительных параметрах установлен флаг use_numpy. Пример параметра {"use_numpy": true}

Пример вызова метода
import pandas as pd
import numpy as np
from calculator.system import vmResource

conn_id = execution_context.inputs[0].value
click_house = vmResource.get_clickhouse(conn_id)

sql = 'SELECT * FROM tutorial.test'
res = click_house.query_dataframe(query=sql)

execution_context.log.info(str(res))

insert_dataframe

Вставка данных, где данные в виде DataFrame.

insert_dataframe(query, dataframe, **kwargs) 
  • query - текст запроса
  • dataframe - данные в формате DataFrame
  • Дополнительные параметры можно посмотреть в документации ClickHouse

Работает если в блоке соединения с ClickHouse в дополнительных параметрах установлен флаг use_numpy. Пример параметра: {"use_numpy": true}

Пример вызова метода
import pandas as pd
import numpy as np
from calculator.system import vmResource

conn_id = execution_context.inputs[0].value
click_house = vmResource.get_clickhouse(conn_id)

df = pd.DataFrame({
    'a': [1, None, None],
    'b': [1.0, None, np.nan],
    'c': ['a', None, np.nan],
}, dtype=object)

sql = 'INSERT INTO tutorial.test VALUES'
click_house.insert_dataframe(sql, df)

Из-за специфики, данный метод не работает при локальной разработке.

create_file_name

Формирует уникальное имя файла (переданное имя файла + текущее время) и возвращает полный путь до этого файла на сервере.

create_file_name(name, ext)

Параметры: - name (Строка) - наименование файла; - ext(Строка) - расширение файла;

Из-за специфики формирования пути на сервере, данный метод не работает при локальной разработке.

Пример
from calculator.system import vmResource

f = vmResource.create_file_name(name='test_file_6', ext='docx')

Работа с курсором

get_cursor

Получить класс для работы с курсором. При получении класса создается курсор.

get_cursor(id_connection, **kwargs)

Методы:

  • execute(query, **kwargs) - выполнение запроса;
  • query - текст запроса;
  • Дополнительные параметры:
    • fetchmany(size) - получение непрочитанных N строк результата запроса;
    • size - количество строк;
    • fetchall() - получение всех строк результата запроса;
    • close() - закрытие курсора.

Свойства:

  • rowcount - получение количества строк. Для каждой БД работает по разному:
    • Postgres - количество строк в запросе выполняемого в курсоре;
    • Oracle - сколько строк было вычитано из курсора;
    • MsSQL - сколько строк было изменено в курсоре.
Пример
from calculator.system import vmResource

conn_id = execution_context.get_input_by_id("id_db").value

cursor = vmResource.get_cursor(conn_id)
cursor.execute('select 1 union all select 2 union all select 3 union all select 4 union all select 5')
execution_context.log.info('rowcount = ' + str(cursor.rowcount))
execution_context.log.info('cursor1 = ' + str(cursor.fetchmany(2)))

cursor2 = vmResource.get_cursor(id_db)
cursor2.execute('select 3')
execution_context.log.info('cursor2 = ' + str(cursor2.fetchmany(2)))

execution_context.log.info('cursor1 = ' + str(cursor.fetchmany(2)))

cursor2.close()
cursor.close()

Oбработка исключений

В связи с тем, что на сервере происходит обработка ошибок, то через обычный except Exception не получится получить текст ошибки, т.к. он будет считан в CException.

Для получения текста ошибки нужно воспользоваться классом CException и получить атрибут message.

Пример обработки ошибки
from calculator.system import vmResource
from core.errors import CException

id_connect =  execution_context.inputs[0].value
sql = 'SELECT * FROM t_table WHERE name = %(name)s and model_name = %(model_name)s'
keys = {"name": "Пример", "model_name": "Модель"}

try:
    data = vmResource.execute(id_connect, sql, keys, final_commit=True)
except CException as e:
    execution_context.log.error(e.message)