Pulsar生产实践
pulsar-client
maven
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>3.0.0</version>
</dependency>
实例化一个PulsarClient
对象,只需要为pulsar集群提供一个URL,如下所示:
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
有多个代理,启动如下:
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650,localhost:6651,localhost:6652")
.build();
producer
实例化PulsarClient对象之后,就可以为特定的topic
创建一个Producer
。
java复制代码Producer<byte[]> producer = client.newProducer()
.topic("my-topic")
.create();
// You can then send messages to the broker and topic you specified:
producer.send("My message".getBytes());
默认情况下,producer生成由字节数组组成的消息。您可以通过指定消息模式来生成不同的类型,后面会详细介绍schema的定义。
// 指定消息类型为String
Producer<String> stringProducer = client.newProducer(Schema.STRING)
.topic("my-topic")
.create();
stringProducer.send("My message");
如果不需要时,请确保关闭他们。
producer.close();
consumer.close();
client.close();
//也可以异步关闭
producer.closeAsync()
.thenRun(() -> System.out.println("Producer closed"))
.exceptionally((ex) -> {
System.err.println("Failed to close producer: " + ex);
return null;
});
consumer
在Pulsar中,消费者订阅主题并处理生产者发布到这些主题的消息。您可以通过首先实例化PulsarClient对象并创建一个新的消费者。
在实例化了PulsarClient对象之后,可以通过指定主题和订阅名来创建Consumer。
Consumer consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscribe();
subscribe
将自动订阅消费者指定的topic
。可以设置一个while循环来持续监听消息。在下面这个示例中,打印counser接收到的任何消息的内容,然后确认消息已被处理(acknowledgment
)。如果处理逻辑失败,您可以使用negative acknowledgment
稍后重新传递消息
while (true) {
// Wait for a message
Message msg = consumer.receive();
try {
// Do something with the message
System.out.println("Message received: " + new String(msg.getData()));
// Acknowledge the message
consumer.acknowledge(msg);
} catch (Exception e) {
// Message failed to process, redeliver later
consumer.negativeAcknowledge(msg);
}
}
如果不想阻塞主线程,但又不断地监听新消息,可以考虑使用MessageListener
。MessageListener
使用客户机内部的线程池。您可以在ClientBuilder中设置消息侦听器的线程数。MessageListener将使用PulsarClient内部的线程池。您可以在ClientBuilder中设置用于消息侦听器的线程数。
MessageListener myMessageListener = (consumer, msg) -> {
try {
System.out.println("Message received: " + new String(msg.getData()));
consumer.acknowledge(msg);
} catch (Exception e) {
consumer.negativeAcknowledge(msg);
}
}
Consumer consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.messageListener(myMessageListener)
.subscribe();
reader
通过reader
接口,pulsar客户端可以在主题中“手动定位”自己,并从指定的消息中读取所有消息。Java的Pulsar API允许您通过指定主题和MessageId
来创建Reader对象。
示例如下:
// Some message ID byte array
byte[] msgIdBytes = MessageId id = MessageId.fromByteArray(msgIdBytes);
Reader reader = pulsarClient.newReader()
.topic(topic)
.startMessageId(id)
.create();
while (true) {
Message message = reader.readNext();
// Process message
}
在上面的例子中,为特定的Topic(按message ID)实例化了一个Reader对象,reader
遍历主题中的每条消息(如何获得该值取决于应用程序)。 也可以使用MessageId.earliest
指向关于MessageId主题的最早可用消息。MessageId.latest
指向最近可用的消息。一般通过Reader进行消息的重放.