Flink CEP(Complex Event Processing)是一种基于流处理框架 Apache Flink 实现复杂事件处理的库,用于在流数据中检测复杂的事件模式。CEP可以帮助开发者从持续的、无界的数据流中提取出满足特定条件的事件序列,广泛应用于金融风控、网络监控、设备监控等领域。
1、Flink CEP基础概念
1.1、什么是复杂事件处理
复杂事件处理(Complex Event Processing,简称 CEP)是一种实时事件流处理技术,旨在从连续的事件流中找到符合预定义模式的事件序列。CEP 可以识别简单事件之间的关系,比如事件的顺序、时间窗口、数量限制等,进而推断出复杂事件。 举例来说,在金融风控中,银行可能想要监控某用户的可疑交易行为,比如短时间内连续进行了大额转账,这就是一个复杂事件。而每次的转账行为则是简单事件,通过 CEP 可以捕捉到符合条件的事件序列。
1.2、Flink CEP 的工作原理
Flink CEP 基于 Apache Flink 强大的流处理能力构建,它主要通过以下几个步骤实现复杂事件检测: 定义模式(Pattern Definition):首先,开发者需要定义一系列的事件模式,这些模式描述了希望在数据流中找到的事件序列。
应用模式(Pattern Application):接下来,Flink CEP 将这些模式应用到输入流上,进行模式匹配。
输出匹配结果(Select Matched Events):当发现某个事件序列符合定义的模式时,CEP会输出匹配结果。
1.3、Flink CEP 核心概念
Pattern:用于定义事件匹配的规则。例如,检测特定事件的顺序或间隔。 PatternStream:Flink CEP 模式匹配的结果流,包含了所有匹配到模式的事件。 Time Windows:Flink CEP 支持基于时间窗口的模式匹配,可以设置时间限制,控制匹配事件的时间跨度。 Quantifiers:在定义模式时,可以使用量词(Quantifiers)来控制事件匹配的数量,比如 “一次或多次”、“至少一次” 等。
2、Flink CEP 的应用场景
Flink CEP 在很多场景中都得到了应用,包括: 金融风控:检测交易欺诈、异常资金流动等; 网络安全:识别 DDoS 攻击等复杂网络威胁; 物联网设备监控:监测设备的异常行为或故障; 物流跟踪:追踪货物的运输过程,检测延迟、丢失等问题; 电商推荐系统:基于用户的行为序列,生成个性化推荐。
3、详细案例讲解
以“用户连续10s内登陆失败超过3次告警 ”为需求来讲解详细的功能
CEP编程步骤总结:
- a)定义模式序列
Pattern.
- b)将模式序列作用到流上
CEP.pattern(inputDataStream,pattern) CEP.pattern()是固定格式写法, 其中第一个参数,表示需要具体作用的流; 第二个参数,表示具体的自定义的模式。
- c)提取匹配上的数据和输出
由b)生成的流用process API来进行数据处理输出,继承PatternProcessFunction,重写processMatch(Map<String, List
> pattern,Context ctx,Collector out)方法, 第一个参数,表示具体匹配上的数据,其中Map的key就是a)步骤中定义的"patternName"名称,value就是该名称具体对应规则匹配上的数据集; 第二个参数,表示没匹配上的数据侧输出流 第三个参数,表示具体该函数处理完,需要对外输出的内容收集。
3.1、代码段讲解
下面就以从Socket中模拟读取用户操作日志数据,来进行数据CEP匹配数据输出。
以如下代码把读进来的数据进行数据打平成JavaBean。该章节的讲解以代码段进行,后续章节会把demo代码全部贴出来。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/**
* 设置成1,是为了能够触发watermark来计算
*/
env.setParallelism(1);
DataStreamSource<String> socketTextStream = env.socketTextStream("localhost", 8888);
SingleOutputStreamOperator<UserLoginLog> dataStream = socketTextStream.flatMap(new MyFlatMapFunction())
.assignTimestampsAndWatermarks(
WatermarkStrategy.<UserLoginLog>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner((SerializableTimestampAssigner<UserLoginLog>) (element, recordTimestamp) -> element.getLoginTime())
);
3.1.1、使用begin.where.next.where.next
/**
* 10s钟之内连续3次登陆失败的才输出,强制连续
*/
Pattern<UserLoginLog, UserLoginLog> wherePatternOne = Pattern.<UserLoginLog>begin("start").where(new SimpleCondition<UserLoginLog>() {
@Override
public boolean filter(UserLoginLog value) throws Exception {
return 1 == value.getLoginStatus();
}
}).next("second").where(new IterativeCondition<UserLoginLog>() {
@Override
public boolean filter(UserLoginLog value, Context<UserLoginLog> ctx) throws Exception {
return 1 == value.getLoginStatus();
}
}).next("third").where(new SimpleCondition<UserLoginLog>() {
@Override
public boolean filter(UserLoginLog value) throws Exception {
return 1 == value.getLoginStatus();
}
}).within(Time.seconds(10));
如上根据设置判断登陆状态是否为失败开始计数,连续第二条,第三条如果也同样为失败的话,就会输出
//如下日志数据输入,最终将输出loginId为:11111、11112、11113、11116、11117、11121
{"loginId":11111,"loginTime":1645177352000,"loginStatus":1,"userName":"aaron"}
{"loginId":11112,"loginTime":1645177353000,"loginStatus":1,"userName":"aaron"}
{"loginId":11113,"loginTime":1645177354000,"loginStatus":1,"userName":"aaron"}
{"loginId":11116,"loginTime":1645177355000,"loginStatus":1,"userName":"aaron"}
{"loginId":11117,"loginTime":1645177356000,"loginStatus":1,"userName":"aaron"}
{"loginId":11118,"loginTime":1645177357000,"loginStatus":1,"userName":"aaron"}
{"loginId":11119,"loginTime":1645177358000,"loginStatus":1,"userName":"aaron"}
{"loginId":11120,"loginTime":1645177359000,"loginStatus":0,"userName":"aaron"}
{"loginId":11121,"loginTime":1645177360000,"loginStatus":1,"userName":"aaron"}
{"loginId":11122,"loginTime":1645177361000,"loginStatus":1,"userName":"aaron"}
{"loginId":11123,"loginTime":1645177362000,"loginStatus":1,"userName":"aaron"}
3.1.2、使用begin.times
/**
* 10s钟之内连续3次登陆失败的才输出,不强制连续
*/
Pattern<UserLoginLog, UserLoginLog> wherePatternTwo = Pattern.<UserLoginLog>begin("start").where(new IterativeCondition<UserLoginLog>() {
@Override
public boolean filter(UserLoginLog value, Context<UserLoginLog> ctx) throws Exception {
return 1 == value.getLoginStatus();
}
}).times(3).within(Time.seconds(10));
如上根据设置判断登陆状态是否为失败开始计数,只要在10秒之内出现第二条,第三条如果也同样为失败的话,就会输出,该本质就是不需要连续出现。
//如下日志数据输入,最终将输出loginId为:11111、11112、11113、11116、11117、11118、11119、11121
{"loginId":11111,"loginTime":1645177352000,"loginStatus":1,"userName":"aaron"}
{"loginId":11112,"loginTime":1645177353000,"loginStatus":1,"userName":"aaron"}
{"loginId":11113,"loginTime":1645177354000,"loginStatus":1,"userName":"aaron"}
{"loginId":11116,"loginTime":1645177355000,"loginStatus":1,"userName":"aaron"}
{"loginId":11117,"loginTime":1645177356000,"loginStatus":1,"userName":"aaron"}
{"loginId":11118,"loginTime":1645177357000,"loginStatus":1,"userName":"aaron"}
{"loginId":11119,"loginTime":1645177358000,"loginStatus":1,"userName":"aaron"}
{"loginId":11120,"loginTime":1645177359000,"loginStatus":0,"userName":"aaron"}
{"loginId":11121,"loginTime":1645177360000,"loginStatus":1,"userName":"aaron"}
{"loginId":11122,"loginTime":1645177361000,"loginStatus":1,"userName":"aaron"}
{"loginId":11123,"loginTime":1645177362000,"loginStatus":1,"userName":"aaron"}
3.1.3、使用begin.times.consecutive
/**
* 10s钟之内连续3次登陆失败的才输出,加上 consecutive 之后 就是 强制连续输出
*/
Pattern<UserLoginLog, UserLoginLog> wherePatternThree = Pattern.<UserLoginLog>begin("start").where(new IterativeCondition<UserLoginLog>() {
@Override
public boolean filter(UserLoginLog value, Context<UserLoginLog> ctx) throws Exception {
return 1 == value.getLoginStatus();
}
}).times(3).consecutive().within(Time.seconds(10));
如上在比3.1.2的基础上多加了一个consecutive之后,就变成跟3.1.1一样的效果
//如下日志数据输入,最终将输出loginId为:11111、11112、11113、11116、11117、11121
{"loginId":11111,"loginTime":1645177352000,"loginStatus":1,"userName":"aaron"}
{"loginId":11112,"loginTime":1645177353000,"loginStatus":1,"userName":"aaron"}
{"loginId":11113,"loginTime":1645177354000,"loginStatus":1,"userName":"aaron"}
{"loginId":11116,"loginTime":1645177355000,"loginStatus":1,"userName":"aaron"}
{"loginId":11117,"loginTime":1645177356000,"loginStatus":1,"userName":"aaron"}
{"loginId":11118,"loginTime":1645177357000,"loginStatus":1,"userName":"aaron"}
{"loginId":11119,"loginTime":1645177358000,"loginStatus":1,"userName":"aaron"}
{"loginId":11120,"loginTime":1645177359000,"loginStatus":0,"userName":"aaron"}
{"loginId":11121,"loginTime":1645177360000,"loginStatus":1,"userName":"aaron"}
{"loginId":11122,"loginTime":1645177361000,"loginStatus":1,"userName":"aaron"}
{"loginId":11123,"loginTime":1645177362000,"loginStatus":1,"userName":"aaron"}
4、本案例所有代码
关注本公众号,回复“flink CEP”获取本案例所有代码!
评论