folder Tahribat.com Forumları
linefolder Java
linefolder Apache Flink Cep Pattern Oluşturma



Apache Flink Cep Pattern Oluşturma

  1. KısayolKısayol reportŞikayet pmÖzel Mesaj
    kajmerpac
    kajmerpac's avatar
    Kayıt Tarihi: 03/Kasım/2008
    Erkek

    merhaba,

    uzun zamandır foruma girmiyordum. İş değişikliği yaptım bundan sonra düzenli takip edebilirim. Bir konuda çok sıkıştım bilgisi olan var ise yardım edebilir mi?

    Java da flink complex event processing ile akan datalar üzerinde kriterlere uyan dataları çekmek istiyorum. elimde karışık bir pattern olacak. Patterni oluşturmada takıldım yardımcı olabilecek var mı? Bişeyler yaptım ama çalışmıyor

    Pattern<ObjectNode, ?>  alarmPattern = Pattern.<ObjectNode>begin("first")
    .where(new IterativeCondition<ObjectNode>() {
    @Override
    public boolean filter(ObjectNode jsonNodes, Context<ObjectNode> context) throws Exception {
    for (Criterias criterias : rules.getCriteriaList()) {
    if (criterias.getCriteriaType().equals("equals")) {
    return jsonNodes.get(criterias.getPropName()).equals(criterias.getCriteriaValue());
    } else if (criterias.getCriteriaType().equals("greaterThen")) {
    if (!jsonNodes.get(criterias.getPropName()).equals(criterias.getCriteriaValue())) {
    return false;
    }
    int count = 0;
    for (ObjectNode node : context.getEventsForPattern("first")) {
    count += node.get("value").asInt();
    }
    return Integer.compare(count, 5) > 0;
    } else if (criterias.getCriteriaType().equals("lessThen")) {
    if (!jsonNodes.get(criterias.getPropName()).equals(criterias.getCriteriaValue())) {
    return false;
    }
    int count = 0;
    for (ObjectNode node : context.getEventsForPattern("first")) {
    count += node.get("value").asInt();
    }
    return Integer.compare(count, 5) < 0;
    }

    }
    return false;
    }
    }).times(rules.getRuleCount());

     

  2. KısayolKısayol reportŞikayet pmÖzel Mesaj
    kajmerpac
    kajmerpac's avatar
    Kayıt Tarihi: 03/Kasım/2008
    Erkek

    var mıdır bilen?

  3. KısayolKısayol reportŞikayet pmÖzel Mesaj
    kajmerpac
    kajmerpac's avatar
    Kayıt Tarihi: 03/Kasım/2008
    Erkek

    Konuyu çözdüm. Belki lazım olan olur diye  aşağıda bir örnek paylaşıyorum.Örnekler mevcut birçok sitede fakat json ile işlem yapan örnek bulmak bira sıkıntı. Kafka serverına gelen jsonlar içerisinde EventType Click olan ve önceki gelen datalar ile o anki data üzerindeki quantity toplam değeri 100 den fazla olan kayıt için alert oluşturuyoruz.

    {"UserId":"1","EventType":"Click","Quantity":"40"}

    {"UserId":"2","EventType":"Click","Quantity":"40"}

    {"UserId":"1","EventType":"Click","Quantity":"40"}  burada alert oluşuyor.

     package Teydeb.cep_kafka;

     

    import com.fasterxml.jackson.databind.ObjectMapper;

    import com.fasterxml.jackson.databind.node.ObjectNode;

    import org.apache.flink.cep.CEP;

    import org.apache.flink.cep.PatternSelectFunction;

    import org.apache.flink.cep.PatternStream;

    import org.apache.flink.cep.pattern.Pattern;

    import org.apache.flink.core.fs.FileSystem;

    import org.apache.flink.streaming.api.datastream.DataStream;

    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;

    import org.apache.flink.streaming.util.serialization.JSONDeserializationSchema;

    import util.*;

     

    import java.util.List;

    import java.util.Map;

    import java.util.Properties;

    import java.util.logging.Level;

    import java.util.logging.Logger;

     

    public class MainClass {

     

        public static void main( String[] args ) throws Exception

        {

            try {

                Logger.getLogger("org").setLevel(Level.OFF);

                Logger.getLogger("akka").setLevel(Level.OFF);

     

               

                    ObjectMapper mapper = new ObjectMapper();

                    Properties properties = new Properties();

                    properties.setProperty("zookeeper.connect", "localhost:2181");

                    properties.setProperty("bootstrap.servers", "localhost:9092");

     

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

                    env.enableCheckpointing(1000);

     

                    FlinkKafkaConsumer010<ObjectNode> myConsumer = new FlinkKafkaConsumer010<>("demo", new JSONDeserializationSchema(),

                            properties);

                    DataStream<ObjectNode> dataStream = env.addSource(myConsumer);

     

    Pattern<ObjectNode, ?>  alarmPattern = Pattern.<ObjectNode>

                    begin("beforeCriteriaFilter").oneOrMore().where(new IterativeCondition<ObjectNode>() {

     @Override

                        public boolean filter(ObjectNode jsonNodes, Context<ObjectNode> context) throws Exception {

     

    return jsonNodes.get("EventType").asText.equals("Click");

    }

     

    }.followedBy("afterCriteriaFilter").where(new IterativeCondition<ObjectNode>() {

                        @Override

                        public boolean filter(ObjectNode jsonNodes, Context<ObjectNode> context) throws Exception {

    jsonValue = new BigDecimal(jsonNodes.get("quantity").asText());

    value = new BigDecimal("300");

    for (ObjectNode node : context.getEventsForPattern("beforeCriteriaFilter"))

    {

    jsonValue = jsonValue.add(new BigDecimal(node.get("quantity").asText()));

    }

    afterCriteria = jsonValue.compareTo(value) > 0;

    return afterCriteria;   

                        }

                    }).times(1);

                        

                        PatternStream<ObjectNode> patternStream = CEP.pattern(filteredDataStream, alarmPattern);

     

    DataStream<ObjectNode> alarms  alarms = patternStream.select(new PatternSelectFunction<ObjectNode, ObjectNode>() {

                            @Override

                            public ObjectNode select(Map<String, List<ObjectNode>> map) throws Exception {

                                ObjectNode objectNode = mapper.createObjectNode();

                              

                                objectNode.put("Value",map.get("beforeCriteriaFilter").get(0).toString());

     

                                return objectNode;

                            }

                        });

     

             

                    env.execute("Flink job");

                }

            }

            catch (Exception e)

            {

                e.printStackTrace();

               

            }

        }

    }

     

     

    kajmerpac tarafından 10/Kas/17 14:13 tarihinde düzenlenmiştir
Toplam Hit: 2142 Toplam Mesaj: 3
apache flink cep