深入浅出Flink CEP:原理、应用与代码示例

Flink CEP(Complex Event Processing)是一种基于流处理框架 Apache Flink 实现复杂事件处理的库,用于在流数据中检测复杂的事件模式。CEP可以帮助开发者从持续的、无界的数据流中提取出满足特定条件的事件序列,广泛应用于金融风控、网络监控、设备监控等领域。

1、Flink CEP基础概念

1.1、什么是复杂事件处理

复杂事件处理(Complex Event Processing,简称 CEP)是一种实时事件流处理技术,旨在从连续的事件流中找到符合预定义模式的事件序列。CEP 可以识别简单事件之间的关系,比如事件的顺序、时间窗口、数量限制等,进而推断出复杂事件。 举例来说,在金融风控中,银行可能想要监控某用户的可疑交易行为,比如短时间内连续进行了大额转账,这就是一个复杂事件。而每次的转账行为则是简单事件,通过 CEP 可以捕捉到符合条件的事件序列。

Flink CEP 基于 Apache Flink 强大的流处理能力构建,它主要通过以下几个步骤实现复杂事件检测: 定义模式(Pattern Definition):首先,开发者需要定义一系列的事件模式,这些模式描述了希望在数据流中找到的事件序列。

应用模式(Pattern Application):接下来,Flink CEP 将这些模式应用到输入流上,进行模式匹配。

输出匹配结果(Select Matched Events):当发现某个事件序列符合定义的模式时,CEP会输出匹配结果。

Pattern:用于定义事件匹配的规则。例如,检测特定事件的顺序或间隔。 PatternStream:Flink CEP 模式匹配的结果流,包含了所有匹配到模式的事件。 Time Windows:Flink CEP 支持基于时间窗口的模式匹配,可以设置时间限制,控制匹配事件的时间跨度。 Quantifiers:在定义模式时,可以使用量词(Quantifiers)来控制事件匹配的数量,比如 “一次或多次”、“至少一次” 等。

2、Flink CEP 的应用场景

Flink CEP 在很多场景中都得到了应用,包括: 金融风控:检测交易欺诈、异常资金流动等; 网络安全:识别 DDoS 攻击等复杂网络威胁; 物联网设备监控:监测设备的异常行为或故障; 物流跟踪:追踪货物的运输过程,检测延迟、丢失等问题; 电商推荐系统:基于用户的行为序列,生成个性化推荐。

3、详细案例讲解

以“用户连续10s内登陆失败超过3次告警 ”为需求来讲解详细的功能

图片

CEP编程步骤总结:

  • a)定义模式序列

Pattern.begin("patternName").API... 基本都是按照如上的套路来新建自定义一个模式规则

  • b)将模式序列作用到流上

CEP.pattern(inputDataStream,pattern) CEP.pattern()是固定格式写法, 其中第一个参数,表示需要具体作用的流; 第二个参数,表示具体的自定义的模式。

  • c)提取匹配上的数据和输出 由b)生成的流用process API来进行数据处理输出,继承PatternProcessFunction,重写processMatch(Map<String, List> pattern,Context ctx,Collector out)方法, 第一个参数,表示具体匹配上的数据,其中Map的key就是a)步骤中定义的"patternName"名称,value就是该名称具体对应规则匹配上的数据集; 第二个参数,表示没匹配上的数据侧输出流 第三个参数,表示具体该函数处理完,需要对外输出的内容收集。

3.1、代码段讲解

下面就以从Socket中模拟读取用户操作日志数据,来进行数据CEP匹配数据输出。

以如下代码把读进来的数据进行数据打平成JavaBean。该章节的讲解以代码段进行,后续章节会把demo代码全部贴出来。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
/**
 * 设置成1,是为了能够触发watermark来计算
 */
env.setParallelism(1);
 
DataStreamSource<String> socketTextStream = env.socketTextStream("localhost", 8888);
 
SingleOutputStreamOperator<UserLoginLog> dataStream = socketTextStream.flatMap(new MyFlatMapFunction())
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<UserLoginLog>forBoundedOutOfOrderness(Duration.ofSeconds(1))
                                .withTimestampAssigner((SerializableTimestampAssigner<UserLoginLog>) (element, recordTimestamp) -> element.getLoginTime())
                );

3.1.1、使用begin.where.next.where.next

/**
 * 10s钟之内连续3次登陆失败的才输出,强制连续
 */
Pattern<UserLoginLog, UserLoginLog> wherePatternOne = Pattern.<UserLoginLog>begin("start").where(new SimpleCondition<UserLoginLog>() {
            @Override
            public boolean filter(UserLoginLog value) throws Exception {
                return 1 == value.getLoginStatus();
            }
        }).next("second").where(new IterativeCondition<UserLoginLog>() {
            @Override
            public boolean filter(UserLoginLog value, Context<UserLoginLog> ctx) throws Exception {
 
                return 1 == value.getLoginStatus();
            }
        }).next("third").where(new SimpleCondition<UserLoginLog>() {
            @Override
            public boolean filter(UserLoginLog value) throws Exception {
 
                return 1 == value.getLoginStatus();
            }
        }).within(Time.seconds(10));

如上根据设置判断登陆状态是否为失败开始计数,连续第二条,第三条如果也同样为失败的话,就会输出

//如下日志数据输入,最终将输出loginId为:11111、11112、11113、11116、11117、11121

{"loginId":11111,"loginTime":1645177352000,"loginStatus":1,"userName":"aaron"}
{"loginId":11112,"loginTime":1645177353000,"loginStatus":1,"userName":"aaron"}
{"loginId":11113,"loginTime":1645177354000,"loginStatus":1,"userName":"aaron"}
{"loginId":11116,"loginTime":1645177355000,"loginStatus":1,"userName":"aaron"}
{"loginId":11117,"loginTime":1645177356000,"loginStatus":1,"userName":"aaron"}
{"loginId":11118,"loginTime":1645177357000,"loginStatus":1,"userName":"aaron"}
{"loginId":11119,"loginTime":1645177358000,"loginStatus":1,"userName":"aaron"}
{"loginId":11120,"loginTime":1645177359000,"loginStatus":0,"userName":"aaron"}
{"loginId":11121,"loginTime":1645177360000,"loginStatus":1,"userName":"aaron"}
{"loginId":11122,"loginTime":1645177361000,"loginStatus":1,"userName":"aaron"}
{"loginId":11123,"loginTime":1645177362000,"loginStatus":1,"userName":"aaron"}

图片

3.1.2、使用begin.times


/**
 * 10s钟之内连续3次登陆失败的才输出,不强制连续
 */
Pattern<UserLoginLog, UserLoginLog> wherePatternTwo = Pattern.<UserLoginLog>begin("start").where(new IterativeCondition<UserLoginLog>() {
            @Override
            public boolean filter(UserLoginLog value, Context<UserLoginLog> ctx) throws Exception {
                return 1 == value.getLoginStatus();
            }
        }).times(3).within(Time.seconds(10));

如上根据设置判断登陆状态是否为失败开始计数,只要在10秒之内出现第二条,第三条如果也同样为失败的话,就会输出,该本质就是不需要连续出现。

//如下日志数据输入,最终将输出loginId为:11111、11112、11113、11116、11117、11118、11119、11121

{"loginId":11111,"loginTime":1645177352000,"loginStatus":1,"userName":"aaron"}
{"loginId":11112,"loginTime":1645177353000,"loginStatus":1,"userName":"aaron"}
{"loginId":11113,"loginTime":1645177354000,"loginStatus":1,"userName":"aaron"}
{"loginId":11116,"loginTime":1645177355000,"loginStatus":1,"userName":"aaron"}
{"loginId":11117,"loginTime":1645177356000,"loginStatus":1,"userName":"aaron"}
{"loginId":11118,"loginTime":1645177357000,"loginStatus":1,"userName":"aaron"}
{"loginId":11119,"loginTime":1645177358000,"loginStatus":1,"userName":"aaron"}
{"loginId":11120,"loginTime":1645177359000,"loginStatus":0,"userName":"aaron"}
{"loginId":11121,"loginTime":1645177360000,"loginStatus":1,"userName":"aaron"}
{"loginId":11122,"loginTime":1645177361000,"loginStatus":1,"userName":"aaron"}
{"loginId":11123,"loginTime":1645177362000,"loginStatus":1,"userName":"aaron"}

图片

3.1.3、使用begin.times.consecutive

/**
 * 10s钟之内连续3次登陆失败的才输出,加上 consecutive 之后 就是 强制连续输出
 */
Pattern<UserLoginLog, UserLoginLog> wherePatternThree = Pattern.<UserLoginLog>begin("start").where(new IterativeCondition<UserLoginLog>() {
            @Override
            public boolean filter(UserLoginLog value, Context<UserLoginLog> ctx) throws Exception {
                return 1 == value.getLoginStatus();
            }
        }).times(3).consecutive().within(Time.seconds(10));

如上在比3.1.2的基础上多加了一个consecutive之后,就变成跟3.1.1一样的效果

//如下日志数据输入,最终将输出loginId为:11111、11112、11113、11116、11117、11121

{"loginId":11111,"loginTime":1645177352000,"loginStatus":1,"userName":"aaron"}
{"loginId":11112,"loginTime":1645177353000,"loginStatus":1,"userName":"aaron"}
{"loginId":11113,"loginTime":1645177354000,"loginStatus":1,"userName":"aaron"}
{"loginId":11116,"loginTime":1645177355000,"loginStatus":1,"userName":"aaron"}
{"loginId":11117,"loginTime":1645177356000,"loginStatus":1,"userName":"aaron"}
{"loginId":11118,"loginTime":1645177357000,"loginStatus":1,"userName":"aaron"}
{"loginId":11119,"loginTime":1645177358000,"loginStatus":1,"userName":"aaron"}
{"loginId":11120,"loginTime":1645177359000,"loginStatus":0,"userName":"aaron"}
{"loginId":11121,"loginTime":1645177360000,"loginStatus":1,"userName":"aaron"}
{"loginId":11122,"loginTime":1645177361000,"loginStatus":1,"userName":"aaron"}
{"loginId":11123,"loginTime":1645177362000,"loginStatus":1,"userName":"aaron"}

4、本案例所有代码

关注本公众号,回复“flink CEP”获取本案例所有代码!

end
  • 作者:lishe (联系作者)
  • 发表时间:2024-10-15 16:27
  • 版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)
  • 转载声明:如果是转载博主转载的文章,请附上原文链接
  • 公众号转载:请在文末添加作者公众号二维码(公众号二维码见右边,欢迎关注)
  • 评论