Apache Flink Cep Pattern Oluşturma
-
merhaba,
uzun zamandır foruma girmiyordum. İş değişikliği yaptım bundan sonra düzenli takip edebilirim. Bir konuda çok sıkıştım bilgisi olan var ise yardım edebilir mi?
Java da flink complex event processing ile akan datalar üzerinde kriterlere uyan dataları çekmek istiyorum. elimde karışık bir pattern olacak. Patterni oluşturmada takıldım yardımcı olabilecek var mı? Bişeyler yaptım ama çalışmıyor
Pattern<ObjectNode, ?> alarmPattern = Pattern.<ObjectNode>begin("first")
.where(new IterativeCondition<ObjectNode>() {
@Override
public boolean filter(ObjectNode jsonNodes, Context<ObjectNode> context) throws Exception {
for (Criterias criterias : rules.getCriteriaList()) {
if (criterias.getCriteriaType().equals("equals")) {
return jsonNodes.get(criterias.getPropName()).equals(criterias.getCriteriaValue());
} else if (criterias.getCriteriaType().equals("greaterThen")) {
if (!jsonNodes.get(criterias.getPropName()).equals(criterias.getCriteriaValue())) {
return false;
}
int count = 0;
for (ObjectNode node : context.getEventsForPattern("first")) {
count += node.get("value").asInt();
}
return Integer.compare(count, 5) > 0;
} else if (criterias.getCriteriaType().equals("lessThen")) {
if (!jsonNodes.get(criterias.getPropName()).equals(criterias.getCriteriaValue())) {
return false;
}
int count = 0;
for (ObjectNode node : context.getEventsForPattern("first")) {
count += node.get("value").asInt();
}
return Integer.compare(count, 5) < 0;
}
}
return false;
}
}).times(rules.getRuleCount()); -
var mıdır bilen?
-
Konuyu çözdüm. Belki lazım olan olur diye aşağıda bir örnek paylaşıyorum.Örnekler mevcut birçok sitede fakat json ile işlem yapan örnek bulmak bira sıkıntı. Kafka serverına gelen jsonlar içerisinde EventType Click olan ve önceki gelen datalar ile o anki data üzerindeki quantity toplam değeri 100 den fazla olan kayıt için alert oluşturuyoruz.
{"UserId":"1","EventType":"Click","Quantity":"40"}
{"UserId":"2","EventType":"Click","Quantity":"40"}
{"UserId":"1","EventType":"Click","Quantity":"40"} burada alert oluşuyor.
package Teydeb.cep_kafka;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.JSONDeserializationSchema;
import util.*;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.logging.Level;
import java.util.logging.Logger;
public class MainClass {
public static void main( String[] args ) throws Exception
{
try {
Logger.getLogger("org").setLevel(Level.OFF);
Logger.getLogger("akka").setLevel(Level.OFF);
ObjectMapper mapper = new ObjectMapper();
Properties properties = new Properties();
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("bootstrap.servers", "localhost:9092");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
FlinkKafkaConsumer010<ObjectNode> myConsumer = new FlinkKafkaConsumer010<>("demo", new JSONDeserializationSchema(),
properties);
DataStream<ObjectNode> dataStream = env.addSource(myConsumer);
Pattern<ObjectNode, ?> alarmPattern = Pattern.<ObjectNode>
begin("beforeCriteriaFilter").oneOrMore().where(new IterativeCondition<ObjectNode>() {
@Override
public boolean filter(ObjectNode jsonNodes, Context<ObjectNode> context) throws Exception {
return jsonNodes.get("EventType").asText.equals("Click");
}
}.followedBy("afterCriteriaFilter").where(new IterativeCondition<ObjectNode>() {
@Override
public boolean filter(ObjectNode jsonNodes, Context<ObjectNode> context) throws Exception {
jsonValue = new BigDecimal(jsonNodes.get("quantity").asText());
value = new BigDecimal("300");
for (ObjectNode node : context.getEventsForPattern("beforeCriteriaFilter"))
{
jsonValue = jsonValue.add(new BigDecimal(node.get("quantity").asText()));
}
afterCriteria = jsonValue.compareTo(value) > 0;
return afterCriteria;
}
}).times(1);
PatternStream<ObjectNode> patternStream = CEP.pattern(filteredDataStream, alarmPattern);
DataStream<ObjectNode> alarms alarms = patternStream.select(new PatternSelectFunction<ObjectNode, ObjectNode>() {
@Override
public ObjectNode select(Map<String, List<ObjectNode>> map) throws Exception {
ObjectNode objectNode = mapper.createObjectNode();
objectNode.put("Value",map.get("beforeCriteriaFilter").get(0).toString());
return objectNode;
}
});
env.execute("Flink job");
}
}
catch (Exception e)
{
e.printStackTrace();
}
}
}
kajmerpac tarafından 10/Kas/17 14:13 tarihinde düzenlenmiştir
