FunctionTransformer 的問題

包含 FunctionTransformer 的管線無法自動轉換為 onnx,因為沒有轉換器能夠將自訂 python 程式碼轉換為 ONNX。 需要為它特別編寫自訂轉換器。

初始嘗試

一個非常簡單的管線,以及首次嘗試將其轉換為 ONNX。

import numpy as np
from numpy.testing import assert_allclose
from onnx.version_converter import convert_version
from pandas import DataFrame
from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.tree import DecisionTreeClassifier
from sklearn.preprocessing import FunctionTransformer
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from skl2onnx import to_onnx

# For the custom converter
from skl2onnx import update_registered_converter
from skl2onnx.common.utils import check_input_and_output_numbers
from skl2onnx.algebra.onnx_ops import OnnxSlice, OnnxSub, OnnxDiv, OnnxMul, OnnxCastLike
from skl2onnx.helpers import add_onnx_graph
import onnxscript
from onnxscript import opset18 as op

# To check discrepancies
from onnx.reference import ReferenceEvaluator
from onnxruntime import InferenceSession


def calculate_growth(df):
    df["c"] = 100 * (df["a"] - df["b"]) / df["b"]
    return df


mapper = ColumnTransformer(
    transformers=[
        ("c", FunctionTransformer(calculate_growth), ["a", "b"]),
    ],
    remainder="passthrough",
    verbose_feature_names_out=False,
)
mapper.set_output(transform="pandas")

pipe = Pipeline([("mapper", mapper), ("classifier", DecisionTreeClassifier())])

data = DataFrame(
    [
        dict(a=2, b=1, f=5),
        dict(a=50, b=4, f=10),
        dict(a=5, b=2, f=4),
        dict(a=100, b=6, f=20),
    ]
)
y = np.array([0, 1, 0, 1], dtype=np.int64)
pipe.fit(data, y)

try:
    to_onnx(pipe, data[:1], options={"zipmap": False})
except Exception as e:
    print("It does not work:", e)
It does not work: FunctionTransformer is not supported unless the transform function is None (= identity). You may raise an issue at https://github.com/onnx/sklearn-onnx/issues.

使用自訂轉換器

如果 FunctionTransformer 實作為自訂轉換器,則更容易編寫自訂轉換器。

class GrowthCalculator(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass

    def calculate_growth(self, x, y):
        return 100 * (x - y) / y

    def fit(self, X, y=None):
        return self

    def transform(self, X, y=None):
        x = X.apply(lambda x: self.calculate_growth(x.a, x.b), axis=1)
        return x.values.reshape((-1, 1))


mapper = ColumnTransformer(
    transformers=[
        ("ab", FunctionTransformer(), ["a", "b"]),  # We keep the first column.
        ("c", GrowthCalculator(), ["a", "b"]),  # We add a new one.
    ],
    remainder="passthrough",
    verbose_feature_names_out=False,
)

pipe_tr = Pipeline([("mapper", mapper), ("classifier", DecisionTreeClassifier())])
pipe_tr.fit(data, y)
Pipeline(steps=[('mapper',
                 ColumnTransformer(remainder='passthrough',
                                   transformers=[('ab', FunctionTransformer(),
                                                  ['a', 'b']),
                                                 ('c', GrowthCalculator(),
                                                  ['a', 'b'])],
                                   verbose_feature_names_out=False)),
                ('classifier', DecisionTreeClassifier())])
在 Jupyter 環境中,請重新執行此儲存格以顯示 HTML 表示法或信任筆記本。
在 GitHub 上,HTML 表示法無法呈現,請嘗試使用 nbviewer.org 載入此頁面。


兩個管線都會傳回相同的輸出。

assert_allclose(pipe.predict_proba(data), pipe_tr.predict_proba(data))

讓我們檢查它是否產生相同數量的特徵。

assert_allclose(pipe.steps[0][-1].transform(data), pipe_tr.steps[0][-1].transform(data))

但轉換仍然失敗,並顯示不同的錯誤訊息。

try:
    to_onnx(pipe_tr, data[:1], options={"zipmap": False})
except Exception as e:
    print("It does not work:", e)
It does not work: Unable to find a shape calculator for type '<class '__main__.GrowthCalculator'>'.
It usually means the pipeline being converted contains a
transformer or a predictor with no corresponding converter
implemented in sklearn-onnx. If the converted is implemented
in another library, you need to register
the converted so that it can be used by sklearn-onnx (function
update_registered_converter). If the model is not yet covered
by sklearn-onnx, you may raise an issue to
https://github.com/onnx/sklearn-onnx/issues
to get the converter implemented or even contribute to the
project. If the model is a custom model, a new converter must
be implemented. Examples can be found in the gallery.

自訂轉換器

我們需要在 ONNX 中實作方法 calculate_growth。第一個函式會傳回預期的類型和形狀。

def growth_shape_calculator(operator):
    check_input_and_output_numbers(operator, input_count_range=1, output_count_range=1)
    # Gets the input type, the transformer works on any numerical type.
    input_type = operator.inputs[0].type.__class__
    # The first dimension is usually dynamic (batch dimension).
    input_dim = operator.inputs[0].get_first_dimension()
    operator.outputs[0].type = input_type([input_dim, 1])


def growth_converter(scope, operator, container):
    # No need to retrieve the fitted estimator, it is not trained.
    # op = operator.raw_operator
    opv = container.target_opset
    X = operator.inputs[0]

    # 100 * (x-y)/y  --> 100 * (X[0] - X[1]) / X[1]

    zero = np.array([0], dtype=np.int64)
    one = np.array([1], dtype=np.int64)
    two = np.array([2], dtype=np.int64)
    hundred = np.array([100], dtype=np.float32)

    # Slice(data, starts, ends, axes)
    x0 = OnnxSlice(X, zero, one, one, op_version=opv)
    x1 = OnnxSlice(X, one, two, one, op_version=opv)
    z = OnnxMul(
        OnnxCastLike(hundred, X, op_version=opv),
        OnnxDiv(OnnxSub(x0, x1, op_version=opv), x1, op_version=opv),
        op_version=opv,
        output_names=operator.outputs[0],
    )
    z.add_to(scope, container)


update_registered_converter(
    GrowthCalculator,
    "AliasGrowthCalculator",
    growth_shape_calculator,
    growth_converter,
)


onx = to_onnx(pipe_tr, data[:1], target_opset=18, options={"zipmap": False})

讓我們檢查是否沒有差異

首先是預期的值

expected = (pipe_tr.predict(data), pipe_tr.predict_proba(data))
print(expected)
(array([0, 1, 0, 1]), array([[1., 0.],
       [0., 1.],
       [1., 0.],
       [0., 1.]]))

然後,讓我們使用 onnx.reference.ReferenceEvaluator 檢查。

feeds = {
    "a": data["a"].values.reshape((-1, 1)),
    "b": data["b"].values.reshape((-1, 1)),
    "f": data["f"].values.reshape((-1, 1)),
}

# verbose=10 to show intermediate results
ref = ReferenceEvaluator(onx, verbose=0)
got = ref.run(None, feeds)

assert_allclose(expected[0], got[0])
assert_allclose(expected[1], got[1])

然後使用用於部署的執行階段,例如 onnxruntime。

ref = InferenceSession(onx.SerializeToString(), providers=["CPUExecutionProvider"])
got = ref.run(None, feeds)

assert_allclose(expected[0], got[0])
assert_allclose(expected[1], got[1])

使用 onnxscript 的自訂轉換器

onnxscript 提供的 API 比 onnx 套件實作的 API 更簡潔。讓我們看看如何使用它來編寫轉換器。

@onnxscript.script()
def calculate_onnxscript_verbose(X):
    # onnxscript must define an opset. We use an identity node
    # from a specific opset to set it (otherwise it fails).
    x0 = op.Slice(X, [0], [1], [1])
    x1 = op.Slice(X, [1], [2], [1])
    return op.Mul(op.Div(op.Sub(x0, x1), x1), 100)

此版本使用 ONNX 運算子的嚴格定義。如果使用常規 python 運算子,則程式碼可以更簡單。它們可能不會轉換為 ONNX,但在這種情況下會引發錯誤訊息。

@onnxscript.script()
def calculate_onnxscript(X):
    # onnxscript must define an opset. We use an identity node
    # from a specific opset to set it (otherwise it fails).
    xi = op.Identity(X)
    x0 = xi[:, :1]
    x1 = xi[:, 1:]
    return (x0 - x1) / x1 * 100

我們也可以檢查它是否等效於 python 實作。

f_expected = calculate_growth(data)["c"].values
f_got = calculate_onnxscript(data[["a", "b"]].values.astype(np.float32))
assert_allclose(f_expected.ravel(), f_got.ravel(), atol=1e-6)

讓我們在轉換器中使用它。

def growth_converter_onnxscript(scope, operator, container):
    # No need to retrieve the fitted estimator, it is not trained.
    # op = operator.raw_operator
    opv = container.target_opset

    # 100 * (x-y)/y  --> 100 * (X[0] - X[1]) / X[1]
    proto = calculate_onnxscript.to_model_proto()
    # The function is written with opset 18, it needs to be converted
    # to the opset required by the user when the conversion starts.
    proto_version = convert_version(proto, opv)
    add_onnx_graph(scope, operator, container, proto_version)


update_registered_converter(
    GrowthCalculator,
    "AliasGrowthCalculator",
    growth_shape_calculator,
    growth_converter_onnxscript,
)

讓我們檢查它是否有效。

onx = to_onnx(pipe_tr, data[:1], target_opset=18, options={"zipmap": False})

再次出現差異。

ref = ReferenceEvaluator(onx, verbose=0)
got = ref.run(None, feeds)
assert_allclose(expected[0], got[0])
assert_allclose(expected[1], got[1])

最後。

print("done.")
done.

腳本的總執行時間: (0 分鐘 0.222 秒)

由 Sphinx-Gallery 產生的展示