/*
生产者生产消息
*/
public class MyProducer2 {
public static void main(String[] args) throws InterruptedException {
//生产者
Properties properties = new Properties();
//连接集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.91.11:9092");
//指定对应的key和value的序列化类型 key.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
//应答机制:-1 all
properties.put(ProducerConfig.ACKS_CONFIG,"-1");
//多线程
ExecutorService executorService = Executors.newCachedThreadPool();
//模拟10个线程,同时向Kafka传递数据
long st = System.currentTimeMillis();
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
//一个线程代表一个人
KafkaProducer<String,String> producer = new KafkaProducer<>(properties);
String threadName = Thread.currentThread().getName();
System.out.println(threadName);
//每个线程传递100000000条数据
for (int j = 0; j < 100000000; j++) {
ProducerRecord<String, String> record = new ProducerRecord<>("bigdata", threadName + " " + j);
producer.send(record);
}
}
});
executorService.execute(thread);
}
executorService.shutdown();
while (true){
//让主线程多休息1秒,主线程关闭,子线程没有跟上,所以数据缺失
Thread.sleep(1000);
if (executorService.isTerminated()){
System.out.println("game over!");
break;
}
}
long stop = System.currentTimeMillis();
System.out.println(stop-st);
}
}
结果
"C:\Program Files\Java\jdk1.8.0_144\bin\java.exe" "-javaagent:D:\Program Files\JetBrains\IntelliJ IDEA 。。。。。。。。。 SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. pool-1-thread-1 pool-1-thread-6 pool-1-thread-9 pool-1-thread-5 pool-1-thread-3 pool-1-thread-7 pool-1-thread-8 pool-1-thread-2 pool-1-thread-4 pool-1-thread-10 game over! 2144794(约一小时) |