FlinkCEP在实时风控场景的落地与优化 演讲人-耿飙-阿里云计算平台事业部-开发工程师 DataFunSummit#2023 目录CONTENT 01FlinkCEP介绍 03CEPSQL语法增强&性能优化 02动态多规则支持04风控场景实际案例 01 FlinkCEP介绍 DataFunSummit#2023 •CEP:复杂事件处理(ComplexEventProcessing) •FlinkCEP:基于Flink实现的复杂事件处理库,它可以识别出数据流中符合特定模式(Pattern)的事件序列,并允许用户作出针对性处理。 A B B C a1 b1 b2 d2 c1 e1 a2 b2 c1 模式 d1 a1 b1 事件流匹配 ... 实时风控 风险用户检测:5分钟内转账次数超过10次且金额大于10000 实时营销 营销策略优化:大促期间在购物车中添加了超过3次商品后,但最后没有结账付款的用户 物联网 异常状态告警:共享单车被骑出指定区域且15分钟内没有回到指定区域后发出告警 02 动态多规则支持 DataFunSummit#2023 •为什么需要支持动态规则更新? 阈值 •频繁变化的实际场景要求对初始规则的内容进行调整或者添加新的规则,而重启Flink作业来使变化后的规则生效的方式时间成本高、影响范围大 规则 条件 事实 •关键问题: •如何让Flink作业不停机加载新规则? •如何解决Pattern的(de)serialization? •现有方案: •修改CepOperator添加注入规则的接口 •基于Groovy引擎动态生成Pattern对象 例子: 5分钟内通过广告链接访问某商品超过5次但最终没有购买 3分钟内访问超过7次最终没有购买 •新增接口(FLIP-200) •PatternProcessor •id •version •timestamp •pattern •patternProcessorFunction •PatternProcessorDiscoverer •PatternProcessorManager JobManagerTaskManager Database poll P OperatorCoordinatorPatternProcessor Discoverer R DynamicCEPOp P PNFANFA TaskManager DynamicCEPOp R DownstreamOp UpstreamOp R NFANFA P :SerializedPatternProcessor R :Record Pattern的抽象: •NFA状态转换图 •子Pattern:节点 •事件选择策略:边 Graph 规则的(de)serialization格式设计原则: •表达能力完整 •方便序列化反序列化 •易于拓展,方便集成 •可读可编辑 JSON Pattern<Event,Event>pattern=Pattern.<Event>begin("start", AfterMatchSkipStrategy.skipPastLastEvent()) .where(newStartCondition("action==0")) .timesOrMore(3) .followedBy("end") .where(newEndCondition()); e1 P1 e2 P2 AviatorCondition: •结合Java反射机制,使用Aviator引擎解析表达式字符串 publicclassEvent[privatefinalintid; privatefinalStringname; privatefinaldoubleprice;privatefinalintaction;privatefinallongeventTime; ] •原理 •Compileexpressioninconstructor •Executeexpressionwithvariablesinfilter() •示例 •AviatorCondition(‘action==1&&price>20’) •AviatorCondition(‘action==0&&price>50’) GroovyCondition:支持Groovy语法,将Groovy表达式作为参数 CustomArgsCondition:自定义参数 如何在同一输入流应用多条规则? 多个PatternStream,多个CEPOperator,多个NFA数据要复制多次 一个PatternStream,一个CEPOperator,多个NFA数据只需传递一次 CEPOp UpstreamOp CEPOp CEPOp DynamicCEPOp UpstreamOp NFANFA NFA 场景:广告投放中的实时反作弊https://help.aliyun.com/document_detail/459880.html https://github.com/RealtimeCompute/ververica-cep-demo 03 CEPSQL语法增强&性能优化 DataFunSummit#2023 选择事件表 定义逻辑分区定义事件顺序 定义CEP输出 定义序列模式定义变量条件 源表 结果表 案例场景:用户行为模式识别 用户从流量入口进入产品边界,执行一系列的操作后最终完成价值转化。识别整体流程周期在10分钟之内的高质量用户。 源表 结果表 结果表 JavaAPI SQL 策略 样例匹配序列 A.next(B) (A B) 严格连续:期望所有匹配事件严格的一个接一个出现,中间没有任何不匹配的事件。 [[ a1a3 b1b2 ]] A.followedBy(B) (A[-X*?-]B)X为未在DEFINE中定义的变量,下同 松散连续:忽略匹配事件之间的不匹配事件。 [[[ a1a2a3 b1b2b2 ]]] A.followedByAny(B) (A [-X*-]B) 非确定性松散连续:更进一步的松散连续,允许忽略掉一些匹配事件。 [[[ a1a2a3 b1b2b2 ]]] A.notNext(B) (A [^B]) 严格非连续:期望事件之后不紧接出现另一事件。 [ a2 ] A.notFollowedBy(B) (A [-X*?-][^B]) 松散非连续:期望一个事件不出现在两个事件之间的任何地方。 无匹配 源表 条件变量 FlinkCEPSQL:语法增强 03定义循环模式中的连续性和贪婪性 标识符 连续性 贪婪性 示例模式 等效JavaAPI 样例匹配序列 无 严格连续 贪婪 (A+C) A.oneOrMore().consecutive().greedy().next(C) [a2[a3 a3c1]c1] ? 严格连续 非贪婪 (A+?C) A.oneOrMore().consecutive().next(C) [a2[a3 a3]c1] ?? 松散连续 贪婪 (A+??C) A.oneOrMore().greedy().next(C) [a1[a2[a3 a2a3c1]a3c1]c1] ??? 松散连续 非贪婪 (A+???C) A.oneOrMore().next(C) [a1[a2[a3 a2a3]a3]c1] 源表 条件变量 FlinkCEPSQL:语法增强 04循环模式指定停止条件(Until) 模式 等效JavaAPI 样例匹配序列 说明 (A+C) A.oneOrMore().consecutive().greedy().next(C) [a2b1a3c1][b1a3c1][a3c1] 以a或b开头的事件都能匹配A模式,A模式内部和AC之间为严格连续。由于a1、a2之间存在d1,无法从a1开始匹配 (A+[B]C) A.oneOrMore().consecutive().greedy().until(B).next(C) [a3c1] A循环模式增加了untilB条件,AC之间仍为严格连续。由于a2开始的循环模式需要在b1处结束,无法满足与c1之间的严格连续要求 (A+[B][-X*?-]C) A.oneOrMore().consecutive().greedy().until(B).followedBy(C) [a2c1][a3c1] AC之间为松散连续,以a2开始的循环模式在b1处结束,并跳过b1、a3匹配c1 (A+??[B][-X*?-]C) A.oneOrMore().greedy().until(B).followedBy(C) [a1a2c1][a2c1][a3c1] 循环模式A内部为松散连续,可跳过d1并结束于b1,匹配a1、a2 源表 type content A a1 D d1 A a2 B b1 A a3 C c1 条件变量 FlinkCEPSQL:语法增强 05组合模式(GroupPattern) 组合模式(grouppattern):将多个模式组合为一个整体用在next()、followedBy()和followedByAny()函数中,并支持整体的循环。 在阿里云实时计算Flink版的SQL作业中使用SQL标准中的括号语法(...)来定义组合模式,支持使用循环量词如+、*、[3,]等。 PATTERN(A(BC*)+?D) a1b1b2c1b3c2c3d1 a1b1b2c1c3d1 Pattern.<String>begin("A").where(...) .next(Pattern.<String>begin("B").where(...) .next("C").where(...).oneOrMore().optional().greedy().consecutive()) .oneOrMore().consecutive() .next("D").where(...) FlinkCEPSQL:语法增强 06AFTERMATCHNOSKIP策略 SKIP_TO_NEXT_ROW:丢弃以相同事件开始的所有部分匹配(CEPSQL默认策略)NO_SKIP:每个成功的匹配都会被输出(JavaAPI默认策略) 模式:ab+,输入:a1b1b2b3: 阿里云实时计算Flink版扩展了SQL标准中的AFTERMATCH语句,可通过AFTERMATCHNOSKIP语句来声明NO_SKIP策略,NO_SKIP策略在完成一条序列的匹配时,不会终止或丢弃其他已经开始的匹配过程。 •减少State访问 •增加Cache、优化onEvent/ProcessingTime()实现 •修复State泄漏 •https://issues.apache.org/jira/browse/FLINK-23314 •对于部分生命周期较短的key,和其相关的computationStates没有及时清理,导致State不断增大。当key包含timestamp或随机ID时,容易出现该问题 •及时清除该key •Tip:使用Flink1.16及之上的FlinkCEP版本,减少Timer的注册,大大减少作业的CPU消耗(10x) •https://issues.apache.org/jira/browse/FLINK-23890 04 风控场景典型应用 DataFunSummit#2023 •交易风控 •一段时间内某个IP退款超过一定金额,触发熔断 •一段时间内某个IP退款超过一定次数,触发熔断 •内容风控 •某用户在X分钟内发布了超过Y条帖子,则进行账号禁言或其他处理 •物联网 •设备上报埋点到日志存储上,有成功和失败信息。如果某个设备连续发生10次以上的某类异常,并且超过15分钟未恢复则告警 •网络安全 •检测到某台电脑的行为满足