职业规划决策图

图片发自简书App




package example;

import java.util.Properties;

import org.apache.storm.Config;

import org.apache.storm.StormSubmitter;

import org.apache.storm.generated.AlreadyAliveException;

import org.apache.storm.generated.AuthorizationException;

import org.apache.storm.generated.InvalidTopologyException;

import org.apache.storm.kafka.bolt.KafkaBolt;

import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;

import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;

import org.apache.storm.kafka.spout.KafkaSpout;

import org.apache.storm.kafka.spout.KafkaSpoutConfig;

import org.apache.storm.testing.IdentityBolt;

import org.apache.storm.topology.TopologyBuilder;

import org.apache.storm.tuple.Fields;

import org.apache.storm.tuple.Values;

public class KafkaStormKafkaTopology {

public static void main(String[] args)throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {

final TopologyBuilder builder =new TopologyBuilder();

final Fields fields =new Fields("topic","key","message");

// Properties

        Properties props =new Properties();

props.put("bootstrap.servers","10.10.10.21:21007,10.10.10.22:21007,10.10.10.23:21007");

props.put("acks","1");

props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");

props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

props.put("security.protocol","SASL_PLAINTEXT");

props.put("sasl.jaas.config","com.sun.security.auth.module.Krb5LoginModule required "

                    +"useTicketCache=false "

                    +"renewTicket=true "

                    +"serviceName=\"kafka\""

                    +"useKeyTab=true "

                    +"keyTab=\"/root/test/user.keytab\""

                    +"principal=\"huangwj@HADOOP.COM\";");

// Kafka spout getting data from "inputTopicStorm"

        KafkaSpoutConfig kafkaSpoutConfig = KafkaSpoutConfig

.builder(props.getProperty("bootstrap.servers"),"input")

.setGroupId("storm")

.setProp(props)

.setRecordTranslator((r) ->new Values(r.topic(), r.key(), r.value()),new Fields("topic","key","message"))

.build();

KafkaSpout kafkaSpout =new KafkaSpout<>(kafkaSpoutConfig);

// Identity bolt (just for testing, doing nothing)

        IdentityBolt identityBolt =new IdentityBolt(fields);

// Kafka bolt to send data into "outputTopicStorm"

        KafkaBolt kafkaBolt =new KafkaBolt()

.withProducerProperties(props)

.withTopicSelector(new DefaultTopicSelector("output"))

.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());

// Building the topology: KafkaSpout -> Identity -> KafkaBolt

        builder.setSpout("kafka-spout", kafkaSpout);

builder.setBolt("identity", identityBolt).shuffleGrouping("kafka-spout");

builder.setBolt("kafka-bolt", kafkaBolt,2).globalGrouping("identity");

// Submit the topology

        Config conf =new Config();

StormSubmitter.submitTopology("Kafka-Storm-Kafka", conf, builder.createTopology());

}

}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容