流式处理方法可以有效地处理实际应用程序的有限内存和处理时间要求。这些方法只存储和处理一个实例或最近实例的一个小窗口。近年来,已经为异常检测和流数据开发了各种免费和开源软件 (FOSS)。然而,这些软件包要么缺乏对异常检测的关注,要么是为批量数据而设计的。我们将现有框架分类为 (i) 流框架和 (ii) 异常检测框架。流式框架的重点不仅在于异常检测,还在于流数据上的其他机器学习任务,例如分类和回归。因此,由于需要同时为其他任务维护不同的方法,它们的专用流异常检测方法数量有限。我们特别专注于流异常检测并引入了一个综合框架,因为这些方法具有高速和有限内存的性质,可以很容易地适应现实生活中的应用程序
https://embed.notionlytics.com/wt/ZXlKd1lXZGxTV1FpT2lJeE9UQTJNbVZtWVRVNE1EazBaV1JtT0RGbVlUY3haV1UzWm1NME5tRmtaQ0lzSW5kdmNtdHpjR0ZqWlZSeVlXTnJaWEpKWkNJNklsZHNTR2hsVEZSUFdXeHpaVmRhUW1ZNU1YQmxJbjA9
我们在 Python 中引入了流式异常检测框架,提供专为单变量和多变量数据设计的模型。 此外,人们可以通过此方法在有监督、半监督和无监督的环境中进行实验。此方法包含流模拟器、评估器、预处理器、统计跟踪器、后处理器、概率校准器等。
示例:使用 Python 测试流处理
security = SecurityOptions(CERTIFICATE_PATH, USERNAME, PASSWORD)
client = StreamingClient(BROKER_ADDRESS, security)
input_topic = client.open_input_topic(INPUT_TOPIC_ID, "example-consumer-group")
output_topic = client.open_output_topic(TOPIC_ID)
def read_stream(new_stream: StreamReader):
stream_writer = output_topic.create_stream()
buffer = new_stream.parameters.create_buffer()
buffer.time_span_in_milliseconds = 100
def on_pandas_frame_handler (df: pandas.DataFrame):
output_df = pandas.DataFrame()
output_df["time"] = df["time"]
output_df["result"] = df.apply(lambda row: "True" if row.speed > 100 else "False", axis=1)
stream_writer.parameters.write(output_df)
buffer.on_read_pandas += on_pandas_frame_handler
input_topic.on_stream_received += read_stream
input_topic.start_reading()
流式异常检测模型 $\mathcal{M}$ 接收数据流 $\mathcal{D}=\left\{\left(\boldsymbol{x}{t}, y{t}\right) \mid t=1,2, \ldots\right\}$,其中 $\boldsymbol{x}_{t} \in \mathbb{R}^{m}$ 是输入维度为 $m$ 的列向量,$y_t$ 定义为:
$$ y_{t}= \begin{cases}1, & \text { 如果 } \boldsymbol{x}_{t} \text { 异常 } \\ 0, & \text { 否则 }\end{cases} $$
请注意,无监督的模型不需要 $y_t$。 在流动异常检测中,模型作为新实例 $\left(\boldsymbol{x}{t}, y{t}\right)$ 接收数据,并且将实例存储在有限的时间和内存中,或直接训练自身,但与批处理模型不同,其可以在训练期间,使用有限数量的实例访问所有数据 $\mathcal{D}$。
为了在我们的 API 中描述模型,我们为任何 $n \in \mathbb{Z}^{+}$ 定义 $\boldsymbol{X}=\left[\begin{array}{lll}\boldsymbol{x}{1} & \ldots & \boldsymbol{x}{n}\end{array}\right]$ 和 $\boldsymbol{y}=\left[\begin{array}{lll}y_{1} & \cdots & y_{n}\end{array}\right]^{T}$。 此方法中所有模型都扩展了提供以下接口的 BaseModel。
我们使用 numpy 包进行线性代数运算和数据结构,将 scikit-learn 用于实用程序和投影方法,用 scipy 进行优化和高效的线性代数运算,使用一个异常检测包,为批处理数据提供一组丰富的异常检测器,提供集成器和流隔离森林模型实现。