kafka详解 | 您所在的位置:网站首页 › cmd启动tomcat › kafka详解 |
安装单机版集群基础名词JavaAPI
安装
单机版https://archive.apache.org/dist/kafka/1.0.0/kafka_2.11-1.0.0.tgz 启动自带的zookeeper bin/zookeeper-server-start.sh config/zookeeper.properties &启动kafka bin/kafka-server-start.sh config/server.properties &测试 # 创建主题bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test# 查看主题bin/kafka-topics.sh --list --zookeeper localhost:2181bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic userLog# 启动生产者,启动后,在命令行下每输入一些字符串按下回车时,就作为一个消息并发送的kafkabin/kafka-console-producer.sh --broker-list localhost:9092 --topic test# 启动消费者,另启动一个窗口bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning 查看指定主题 集群修改配置文件即可 基础 名词Broker:消息中间件处理节点;每个Kafka服务节点称之为一个Broker,一个Kafka集群由一个或多个Broker组成 Topic:一类特定数据集合的统称;可类比DB中Table的概念;逻辑概念 Producer:消息的生产者,向Broker发送消息的客户端 Consumer:消息的消费者,向Broker读取消息的客户端 Consumer Group:每一个Consumer隶属于一个特定的Consumer Group,一条消息可以被不同Group中的Consumer消费,但同一Group内的消息只能被一个Consumer消费 Partition:是对Topic中所包含数据集的物理分区;物理概念 Replication:副本集;是Kafka高可用的一种保障机制 高性能 JavaAPI不同版本API可能会有区别 org.apache.kafka kafka_2.11 1.0.0 package com.alvin.controller;import kafka.Kafka;import kafka.producer.KeyedMessage;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;import java.util.UUID;/** * @Description * @Author 田云 * @Date 2021/3/21 11:07 * @Version 1.0 */public class KafkaProducerSimple { public static void main(String[] args) { String TOPIC = "orderMq"; //读取配置文件 Properties props = new Properties(); //1.指定Kafaka集群的ip地址和端口号 props.put("bootstrap.servers","localhost:9092"); //2.等待所有副本节点的应答 props.put("acks","all"); //3.消息发送最大尝试次数 props.put("retries",0); //4.指定一批消息处理次数 props.put("batch.size",16384); //5.指定请求延时 props.put("linger.ms",1); //6.指定缓存区内存大小 props.put("buffer.memory",33554432); //7.设置key序列化 props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); //8.设置value序列化 props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); //可选配置用来把消息分到各个partition中,默认是kafka.producer.DefaultPartioner,即对key进行hash //props.put("partitioner.class", "cn.mylove.storm.kafka.MyLogPartitioner"); //通过配置文件创建生产者 KafkaProducer producer = new KafkaProducer(props); for (int i = 0; i |
CopyRight 2018-2019 实验室设备网 版权所有 |