Skip to content

Pulsar生产实践

pulsar-client

maven

xml
<dependency>  
    <groupId>org.apache.pulsar</groupId>  
    <artifactId>pulsar-client</artifactId>  
    <version>3.0.0</version>  
</dependency>

实例化一个PulsarClient对象,只需要为pulsar集群提供一个URL,如下所示:

java
PulsarClient client = PulsarClient.builder()  
.serviceUrl("pulsar://localhost:6650")  
.build();

有多个代理,启动如下:

java
PulsarClient client = PulsarClient.builder()  
.serviceUrl("pulsar://localhost:6650,localhost:6651,localhost:6652")  
.build();

producer

实例化PulsarClient对象之后,就可以为特定的topic创建一个Producer

java
java复制代码Producer<byte[]> producer = client.newProducer()  
.topic("my-topic")  
.create();  
  
// You can then send messages to the broker and topic you specified:  
producer.send("My message".getBytes());

默认情况下,producer生成由字节数组组成的消息。您可以通过指定消息模式来生成不同的类型,后面会详细介绍schema的定义。

java
// 指定消息类型为String
Producer<String> stringProducer = client.newProducer(Schema.STRING)  
.topic("my-topic")  
.create();  
stringProducer.send("My message");

如果不需要时,请确保关闭他们。

java
producer.close();
consumer.close();
client.close();

//也可以异步关闭
producer.closeAsync()  
.thenRun(() -> System.out.println("Producer closed"))  
.exceptionally((ex) -> {  
System.err.println("Failed to close producer: " + ex);  
return null;  
});

consumer

在Pulsar中,消费者订阅主题并处理生产者发布到这些主题的消息。您可以通过首先实例化PulsarClient对象并创建一个新的消费者。

在实例化了PulsarClient对象之后,可以通过指定主题和订阅名来创建Consumer。

java
Consumer consumer = client.newConsumer()  
.topic("my-topic")  
.subscriptionName("my-subscription")  
.subscribe();

subscribe将自动订阅消费者指定的topic。可以设置一个while循环来持续监听消息。在下面这个示例中,打印counser接收到的任何消息的内容,然后确认消息已被处理(acknowledgment)。如果处理逻辑失败,您可以使用negative acknowledgment稍后重新传递消息

java
while (true) {  
    // Wait for a message  
    Message msg = consumer.receive();  

    try {  
        // Do something with the message  
        System.out.println("Message received: " + new String(msg.getData()));  

        // Acknowledge the message  
        consumer.acknowledge(msg);  
    } catch (Exception e) {  
        // Message failed to process, redeliver later  
        consumer.negativeAcknowledge(msg);  
    }  
}

如果不想阻塞主线程,但又不断地监听新消息,可以考虑使用MessageListenerMessageListener使用客户机内部的线程池。您可以在ClientBuilder中设置消息侦听器的线程数。MessageListener将使用PulsarClient内部的线程池。您可以在ClientBuilder中设置用于消息侦听器的线程数。

java
MessageListener myMessageListener = (consumer, msg) -> {  
    try {  
        System.out.println("Message received: " + new String(msg.getData()));  
        consumer.acknowledge(msg);  
    } catch (Exception e) {  
        consumer.negativeAcknowledge(msg);  
    }  
}  
  
Consumer consumer = client.newConsumer()  
.topic("my-topic")  
.subscriptionName("my-subscription")  
.messageListener(myMessageListener)  
.subscribe();

reader

通过reader接口,pulsar客户端可以在主题中“手动定位”自己,并从指定的消息中读取所有消息。Java的Pulsar API允许您通过指定主题和MessageId来创建Reader对象。

示例如下:

java
// Some message ID byte array  
byte[] msgIdBytes = MessageId id = MessageId.fromByteArray(msgIdBytes);  
Reader reader = pulsarClient.newReader()  
    .topic(topic)  
    .startMessageId(id)  
    .create();  
while (true) {  
    Message message = reader.readNext();  
    // Process message  
}

在上面的例子中,为特定的Topic(按message ID)实例化了一个Reader对象,reader遍历主题中的每条消息(如何获得该值取决于应用程序)。 也可以使用MessageId.earliest指向关于MessageId主题的最早可用消息。MessageId.latest指向最近可用的消息。一般通过Reader进行消息的重放.