Модуль vmResource
Модуль позволяет работать с соединением с базой данных внутри блока Python.
Импорт модуля
Доступные методы
execute
Позволяет выполнить запрос используя открытое соединение выбранной базы данных.
Поддерживает: ClickHouse, MsSQL, Oracle, Postgres
Не поддерживает: Kafka, Mongo, SAP, SOAP
Параметры:
id_connection(Строка) - id открытого соединения;sql_text(Строка) - запрос на получение данных;sql_params(Список или Справочник) - параметры запроса;- Дополнительные флаги:
dict_return- возвращение тела запроса в виде словаря (True|False);- только для MsSQL, Oracle, Postgres;
show_description- вывод описание полей (True|False);one_row_return- вывод только одной строки запроса (True|False);null_return_value- значение, возвращаемое при пустом запросе;final_commit- коммит после завершения запроса (True|False);
Флаг final_commit временно не работает. Выполнение транзакций с его помощью невозможно.
Пример
package_insert
Позволяет выполнить пакетное сохранение данных, передаваемых в виде массива значений. Используется открытое соединение выбранной базы данных.
Поддерживает: MsSQL, Oracle, Postgres
Не поддерживает: ClickHouse, Kafka, Mongo, SAP, SOAP
Параметры:
id_connection(Строка) - id открытого соединенияsql_text(Строка) - запрос на вставку данныхsql_params(Список) - данные- дополнительные флаги
template- форма, по которым будут вставляться данные в бдfinal_commit- коммит после завершения запроса (True|False)
Флаг final_commit временно не работает. Выполнение транзакций с его помощью невозможно.
Пример
from calculator.system import vmResource
id_connect = execution_context.inputs[0].value
sql = """
INSERT INTO t_table (
column1, column2, vl
)
VALUES %s
"""
rows = [['Название 1', 'Атрибут 1', 10], ['Название 1', 'Атрибут 2', 100], ['Название 2', 'Атрибут 1', 1]]
vmResource.package_insert(id_connect, sql_text=sql, sql_params=rows, final_commit=True)
Пример вставки, с данными в виде списка
from calculator.system import vmResource
id_connect = execution_context.inputs[0].value
sql = """
INSERT INTO t_table (
column1, column2, vl
)
VALUES %s
"""
# формат данных, по которым будут вставляться данные в бд
column_formats = '(%s::varchar, %s, %s::int)'
rows = [['Название 1', 'Атрибут 1', 10], ['Название 1', 'Атрибут 2', 100], ['Название 2', 'Атрибут 1', 1]]
vmResource.package_insert(id_connect, sql_text=sql, sql_params=rows, template=column_formats, final_commit=True)
Пример вставки, с данными в виде справочника
from calculator.system import vmResource
id_connect = execution_context.inputs[0].value
sql = """
INSERT INTO t_table (
column1, column2, vl
)
VALUES %s
"""
# формат данных, по которым будут вставляться данные в бд
column_formats = "(%(CITY_ID)s, NULLIF(%(TYPE_NAME)s, '')::varchar, %(TYPE_ID)s::int)"
rows = [{"CITY_ID": "Название 1", "TYPE_NAME": "Атрибут 1", "TYPE_ID": 10}, {"CITY_ID": "Название 1", "TYPE_NAME": "Атрибут 2", "TYPE_ID": 100}, {"CITY_ID": "Название 2", "TYPE_NAME": "Атрибут 1", "TYPE_ID": 1}]
vmResource.package_insert(id_connect, sql_text=sql, sql_params=rows, template=column_formats, final_commit=True)
Работа с Clickhouse
get_clickhouse
Получает класс для работы с ClickHouse
execute
Выполнение запроса к Clickhouse.
query- текст запроса Дополнительные параметры можно посмотреть в документации ClickHouse
Пример вызова метода
execute_iter
Создание итератора для получения данных
query- текст запроса- Дополнительные параметры можно посмотреть в документации ClickHouse
Пример вызова метода
from calculator.system import vmResource
conn_id = execution_context.inputs[0].value
click_house = vmResource.get_clickhouse(conn_id)
sql = 'select * from default.ora_vw_aff_channels'
settings = {'max_block_size': 100}
click_house.execute_iter(query=sql, settings=settings)
for row in click_house:
execution_context.log.info(row)
execute_with_progress
Запуск сложного запроса, с возможностью его остановить.
query- текст запроса;- Дополнительные параметры можно посмотреть в документации ClickHouse;
- После того как запрос выполнился надо вызвать
get_result().
Пример вызова метода
from calculator.system import vmResource
conn_id = execution_context.inputs[0].value
click_house = vmResource.get_clickhouse(conn_id)
sql = 'select * from default.ora_vw_aff_channels'
click_house.execute_with_progress(query=sql)
for num_rows, total_rows in click_house:
execution_context.log.info(num_rows, total_rows)
execution_context.log.info(click_house.get_result())
query_dataframe
Выполнение запроса, а результат в виде DataFrame.
query- текст запроса- Дополнительные параметры можно посмотреть в документации ClickHouse
Работает, если в блоке соединения с ClickHouse в дополнительных параметрах установлен флаг use_numpy. Пример
параметра {"use_numpy": true}
Пример вызова метода
insert_dataframe
Вставка данных, где данные в виде DataFrame.
query- текст запросаdataframe- данные в формате DataFrame- Дополнительные параметры можно посмотреть в документации ClickHouse
Работает если в блоке соединения с ClickHouse в дополнительных параметрах установлен флаг use_numpy.
Пример параметра: {"use_numpy": true}
Пример вызова метода
import pandas as pd
import numpy as np
from calculator.system import vmResource
conn_id = execution_context.inputs[0].value
click_house = vmResource.get_clickhouse(conn_id)
df = pd.DataFrame({
'a': [1, None, None],
'b': [1.0, None, np.nan],
'c': ['a', None, np.nan],
}, dtype=object)
sql = 'INSERT INTO tutorial.test VALUES'
click_house.insert_dataframe(sql, df)
Из-за специфики, данный метод не работает при локальной разработке.
create_file_name
Формирует уникальное имя файла (переданное имя файла + текущее время) и возвращает полный путь до этого файла на сервере.
Параметры:
- name (Строка) - наименование файла;
- ext(Строка) - расширение файла;
Из-за специфики формирования пути на сервере, данный метод не работает при локальной разработке.
Пример
Работа с курсором
get_cursor
Получить класс для работы с курсором. При получении класса создается курсор.
Методы:
execute(query, **kwargs)- выполнение запроса;query- текст запроса;- Дополнительные параметры:
fetchmany(size)- получение непрочитанных N строк результата запроса;size- количество строк;fetchall()- получение всех строк результата запроса;close()- закрытие курсора.
Свойства:
rowcount- получение количества строк. Для каждой БД работает по разному:- Postgres - количество строк в запросе выполняемого в курсоре;
- Oracle - сколько строк было вычитано из курсора;
- MsSQL - сколько строк было изменено в курсоре.
Пример
from calculator.system import vmResource
conn_id = execution_context.get_input_by_id("id_db").value
cursor = vmResource.get_cursor(conn_id)
cursor.execute('select 1 union all select 2 union all select 3 union all select 4 union all select 5')
execution_context.log.info('rowcount = ' + str(cursor.rowcount))
execution_context.log.info('cursor1 = ' + str(cursor.fetchmany(2)))
cursor2 = vmResource.get_cursor(id_db)
cursor2.execute('select 3')
execution_context.log.info('cursor2 = ' + str(cursor2.fetchmany(2)))
execution_context.log.info('cursor1 = ' + str(cursor.fetchmany(2)))
cursor2.close()
cursor.close()
Oбработка исключений
В связи с тем, что на сервере происходит обработка ошибок, то через обычный except Exception не получится получить
текст ошибки, т.к. он будет считан в CException.
Для получения текста ошибки нужно воспользоваться классом CException и получить атрибут message.
Пример обработки ошибки
from calculator.system import vmResource
from core.errors import CException
id_connect = execution_context.inputs[0].value
sql = 'SELECT * FROM t_table WHERE name = %(name)s and model_name = %(model_name)s'
keys = {"name": "Пример", "model_name": "Модель"}
try:
data = vmResource.execute(id_connect, sql, keys, final_commit=True)
except CException as e:
execution_context.log.error(e.message)