注意
前往結尾下載完整範例程式碼。
FunctionTransformer 的問題¶
包含 FunctionTransformer 的管線無法自動轉換為 onnx,因為沒有轉換器能夠將自訂 python 程式碼轉換為 ONNX。 需要為它特別編寫自訂轉換器。
初始嘗試¶
一個非常簡單的管線,以及首次嘗試將其轉換為 ONNX。
import numpy as np
from numpy.testing import assert_allclose
from onnx.version_converter import convert_version
from pandas import DataFrame
from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.tree import DecisionTreeClassifier
from sklearn.preprocessing import FunctionTransformer
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from skl2onnx import to_onnx
# For the custom converter
from skl2onnx import update_registered_converter
from skl2onnx.common.utils import check_input_and_output_numbers
from skl2onnx.algebra.onnx_ops import OnnxSlice, OnnxSub, OnnxDiv, OnnxMul, OnnxCastLike
from skl2onnx.helpers import add_onnx_graph
import onnxscript
from onnxscript import opset18 as op
# To check discrepancies
from onnx.reference import ReferenceEvaluator
from onnxruntime import InferenceSession
def calculate_growth(df):
df["c"] = 100 * (df["a"] - df["b"]) / df["b"]
return df
mapper = ColumnTransformer(
transformers=[
("c", FunctionTransformer(calculate_growth), ["a", "b"]),
],
remainder="passthrough",
verbose_feature_names_out=False,
)
mapper.set_output(transform="pandas")
pipe = Pipeline([("mapper", mapper), ("classifier", DecisionTreeClassifier())])
data = DataFrame(
[
dict(a=2, b=1, f=5),
dict(a=50, b=4, f=10),
dict(a=5, b=2, f=4),
dict(a=100, b=6, f=20),
]
)
y = np.array([0, 1, 0, 1], dtype=np.int64)
pipe.fit(data, y)
try:
to_onnx(pipe, data[:1], options={"zipmap": False})
except Exception as e:
print("It does not work:", e)
It does not work: FunctionTransformer is not supported unless the transform function is None (= identity). You may raise an issue at https://github.com/onnx/sklearn-onnx/issues.
使用自訂轉換器¶
如果 FunctionTransformer 實作為自訂轉換器,則更容易編寫自訂轉換器。
class GrowthCalculator(BaseEstimator, TransformerMixin):
def __init__(self):
pass
def calculate_growth(self, x, y):
return 100 * (x - y) / y
def fit(self, X, y=None):
return self
def transform(self, X, y=None):
x = X.apply(lambda x: self.calculate_growth(x.a, x.b), axis=1)
return x.values.reshape((-1, 1))
mapper = ColumnTransformer(
transformers=[
("ab", FunctionTransformer(), ["a", "b"]), # We keep the first column.
("c", GrowthCalculator(), ["a", "b"]), # We add a new one.
],
remainder="passthrough",
verbose_feature_names_out=False,
)
pipe_tr = Pipeline([("mapper", mapper), ("classifier", DecisionTreeClassifier())])
pipe_tr.fit(data, y)
兩個管線都會傳回相同的輸出。
assert_allclose(pipe.predict_proba(data), pipe_tr.predict_proba(data))
讓我們檢查它是否產生相同數量的特徵。
assert_allclose(pipe.steps[0][-1].transform(data), pipe_tr.steps[0][-1].transform(data))
但轉換仍然失敗,並顯示不同的錯誤訊息。
try:
to_onnx(pipe_tr, data[:1], options={"zipmap": False})
except Exception as e:
print("It does not work:", e)
It does not work: Unable to find a shape calculator for type '<class '__main__.GrowthCalculator'>'.
It usually means the pipeline being converted contains a
transformer or a predictor with no corresponding converter
implemented in sklearn-onnx. If the converted is implemented
in another library, you need to register
the converted so that it can be used by sklearn-onnx (function
update_registered_converter). If the model is not yet covered
by sklearn-onnx, you may raise an issue to
https://github.com/onnx/sklearn-onnx/issues
to get the converter implemented or even contribute to the
project. If the model is a custom model, a new converter must
be implemented. Examples can be found in the gallery.
自訂轉換器¶
我們需要在 ONNX 中實作方法 calculate_growth。第一個函式會傳回預期的類型和形狀。
def growth_shape_calculator(operator):
check_input_and_output_numbers(operator, input_count_range=1, output_count_range=1)
# Gets the input type, the transformer works on any numerical type.
input_type = operator.inputs[0].type.__class__
# The first dimension is usually dynamic (batch dimension).
input_dim = operator.inputs[0].get_first_dimension()
operator.outputs[0].type = input_type([input_dim, 1])
def growth_converter(scope, operator, container):
# No need to retrieve the fitted estimator, it is not trained.
# op = operator.raw_operator
opv = container.target_opset
X = operator.inputs[0]
# 100 * (x-y)/y --> 100 * (X[0] - X[1]) / X[1]
zero = np.array([0], dtype=np.int64)
one = np.array([1], dtype=np.int64)
two = np.array([2], dtype=np.int64)
hundred = np.array([100], dtype=np.float32)
# Slice(data, starts, ends, axes)
x0 = OnnxSlice(X, zero, one, one, op_version=opv)
x1 = OnnxSlice(X, one, two, one, op_version=opv)
z = OnnxMul(
OnnxCastLike(hundred, X, op_version=opv),
OnnxDiv(OnnxSub(x0, x1, op_version=opv), x1, op_version=opv),
op_version=opv,
output_names=operator.outputs[0],
)
z.add_to(scope, container)
update_registered_converter(
GrowthCalculator,
"AliasGrowthCalculator",
growth_shape_calculator,
growth_converter,
)
onx = to_onnx(pipe_tr, data[:1], target_opset=18, options={"zipmap": False})
讓我們檢查是否沒有差異¶
首先是預期的值
(array([0, 1, 0, 1]), array([[1., 0.],
[0., 1.],
[1., 0.],
[0., 1.]]))
然後,讓我們使用 onnx.reference.ReferenceEvaluator
檢查。
feeds = {
"a": data["a"].values.reshape((-1, 1)),
"b": data["b"].values.reshape((-1, 1)),
"f": data["f"].values.reshape((-1, 1)),
}
# verbose=10 to show intermediate results
ref = ReferenceEvaluator(onx, verbose=0)
got = ref.run(None, feeds)
assert_allclose(expected[0], got[0])
assert_allclose(expected[1], got[1])
然後使用用於部署的執行階段,例如 onnxruntime。
ref = InferenceSession(onx.SerializeToString(), providers=["CPUExecutionProvider"])
got = ref.run(None, feeds)
assert_allclose(expected[0], got[0])
assert_allclose(expected[1], got[1])
使用 onnxscript 的自訂轉換器¶
onnxscript 提供的 API 比 onnx 套件實作的 API 更簡潔。讓我們看看如何使用它來編寫轉換器。
@onnxscript.script()
def calculate_onnxscript_verbose(X):
# onnxscript must define an opset. We use an identity node
# from a specific opset to set it (otherwise it fails).
x0 = op.Slice(X, [0], [1], [1])
x1 = op.Slice(X, [1], [2], [1])
return op.Mul(op.Div(op.Sub(x0, x1), x1), 100)
此版本使用 ONNX 運算子的嚴格定義。如果使用常規 python 運算子,則程式碼可以更簡單。它們可能不會轉換為 ONNX,但在這種情況下會引發錯誤訊息。
@onnxscript.script()
def calculate_onnxscript(X):
# onnxscript must define an opset. We use an identity node
# from a specific opset to set it (otherwise it fails).
xi = op.Identity(X)
x0 = xi[:, :1]
x1 = xi[:, 1:]
return (x0 - x1) / x1 * 100
我們也可以檢查它是否等效於 python 實作。
f_expected = calculate_growth(data)["c"].values
f_got = calculate_onnxscript(data[["a", "b"]].values.astype(np.float32))
assert_allclose(f_expected.ravel(), f_got.ravel(), atol=1e-6)
讓我們在轉換器中使用它。
def growth_converter_onnxscript(scope, operator, container):
# No need to retrieve the fitted estimator, it is not trained.
# op = operator.raw_operator
opv = container.target_opset
# 100 * (x-y)/y --> 100 * (X[0] - X[1]) / X[1]
proto = calculate_onnxscript.to_model_proto()
# The function is written with opset 18, it needs to be converted
# to the opset required by the user when the conversion starts.
proto_version = convert_version(proto, opv)
add_onnx_graph(scope, operator, container, proto_version)
update_registered_converter(
GrowthCalculator,
"AliasGrowthCalculator",
growth_shape_calculator,
growth_converter_onnxscript,
)
讓我們檢查它是否有效。
onx = to_onnx(pipe_tr, data[:1], target_opset=18, options={"zipmap": False})
再次出現差異。
ref = ReferenceEvaluator(onx, verbose=0)
got = ref.run(None, feeds)
assert_allclose(expected[0], got[0])
assert_allclose(expected[1], got[1])
最後。
print("done.")
done.
腳本的總執行時間: (0 分鐘 0.222 秒)