外观
pulsar
约 2151 字大约 7 分钟
2025-11-10
golang的sdk
ClientOptions
//为Pulsar服务配置服务URL。
//此参数是必需的
URL string
//TCP连接建立超时(默认值:5秒)
ConnectionTimeout time.Duration
//设置操作超时(默认值:30秒)
//Producer create、subscribe和unsubscribe操作将重试,直到该时间间隔结束,之后
//操作将被标记为失败
OperationTimeout time.Duration
//配置身份验证提供程序。(默认:无身份验证)
//示例:`Authentication:NewAuthenticationTLS(“my-cert.pem”、“my key.pem”)`
Authentication
//设置受信任的TLS证书文件的路径
TLSTrustCertsFilePath string
//配置Pulsar客户端是否接受来自代理的不受信任的TLS证书(默认值:false)
TLSAllowInsecureConnection bool
//配置Pulsar客户端是否从代理验证主机名的有效性(默认值:false)
TLSValidateHostname bool
//为vpc用户配置网络模型以连接pulsar broker
ListenerName string
//将保留在池中的单个代理的最大连接数。(默认值:1个连接)
MaxConnectionsPerBroker int
//配置客户端使用的记录器。
//默认情况下,一个包装好的logrus。将使用标准记录器,即,
//日志。NewLoggerWithLogrus(logrus.StandardLogger())
//FIXME:使用'logger'作为内部字段名,而不是'log',因为它更惯用
Logger log.Logger
//指定租户、命名空间或主题级别的度量基数,或将其完全删除。
//默认值:MetricsCardinalityNamespace
MetricsCardinality MetricsCardinality
//将自定义标签添加到此客户端实例报告的所有度量
CustomMetricsLabels map[string]stringProducerOptions
//主题指定制作人将发布的主题。
//构造生产者时需要此参数。
Topic string
//名称指定生产者的名称
//如果未分配,系统将生成一个全局唯一的名称,可以通过
//生产者。ProducerName()。
//指定名称时,由用户确保给定主题的制作人名称是唯一的
//在所有脉冲星的星团中。经纪人将强制规定,只有一个特定名称的制作人可以在网上发布
//一个话题。
Name string
//属性将一组应用程序定义的属性附加到生产者
//这些属性将在主题统计中可见
Properties map[string]string
//SendTimeout设置自发送后服务器未确认的消息的超时时间。
//Send和SendAsync在超时后返回错误。
//默认值为30秒,负值为-1表示禁用。
SendTimeout time.Duration
//DisableBlockIfQueueFull控制如果生产者的消息队列已满,是否发送和发送异步块。
//默认值为false,如果设置为true,则在队列已满时发送和发送异步返回错误。
DisableBlockIfQueueFull bool
//MaxPendingMessages设置等待接收消息的队列的最大大小
//经纪人的回执。
MaxPendingMessages int
//HashingScheme change“HashingScheme”用于选择发布特定消息的分区。
//可用的标准哈希函数有:
//
//-`JavaStringHash`:Java字符串。hashCode()等价
//-`Murruer3_32Hash`:使用Murruer3哈希函数。
// "https://en.wikipedia.org/wiki/MurmurHash">https://en.wikipedia.org/wiki/MurmurHash
//默认值为“JavaStringHash”。
HashingScheme
//CompressionType设置生产者的压缩类型。
//默认情况下,不会压缩消息有效负载。支持的压缩类型包括:
//-LZ4
//-ZLIB
//-ZSTD
//
//注:从Pulsar 2.3开始支持ZSTD。消费者至少需要做到这一点
//释放以便能够接收用ZSTD压缩的消息。
CompressionType
//定义所需的压缩级别。选项:
//-默认
//-更快
//-更好
CompressionLevel
//MessageRouter通过传递MessageRouter的实现来设置自定义消息路由策略
//路由器是一个函数,在给定特定消息和主题元数据后,返回
//消息应该路由到的分区索引
MessageRouter func(*ProducerMessage, TopicMetadata) int
//DisableBatching控制是否为生产者启用消息自动批处理。默认情况下,批处理
//已启用。
//启用批处理时,会多次调用生产者。sendAsync可以将单个批发送到
//代理,从而提高吞吐量,尤其是在发布小消息时。如果启用了压缩,
//消息将在批处理级别进行压缩,从而为类似的头文件或文件提供更好的压缩比
//内容。
//启用时,默认批处理延迟设置为1毫秒,默认批处理大小为1000条消息
//设置“DisableBatching:true”将使制作者单独发送消息
DisableBatching bool
//BatchingMaxPublishDelay设置对发送的消息进行批处理的时间段(默认值:10ms)
//如果批处理消息已启用。如果设置为非零值,消息将一直排队,直到此时
//间隔或直到
BatchingMaxPublishDelay time.Duration
//BatchingMaxMessages设置批处理中允许的最大消息数。(默认值:1000)
//如果设置为大于1的值,消息将排队,直到达到或超过此阈值
//已达到BatchingMaxSize(见下文)或批处理间隔已过。
BatchingMaxMessages uint
//BatchingMaxSize设置批处理中允许的最大字节数。(默认128 KB)
//如果设置为大于1的值,消息将排队,直到达到或超过此阈值
//已达到BatchingMaxMessages(见上文)或批处理间隔已过。
BatchingMaxSize uint
//作为一个拦截器链,这些拦截器将在ProducerInterceptor接口中定义的某些点被调用
Interceptors ProducerInterceptors
Schema Schema
//MaxReconnectToBroker设置重新连接Broker的最大重试次数。(默认值:ultimate)
MaxReconnectToBroker *uint
//BatcherBuilderType设置批处理生成器类型(默认为DefaultBatchBuilder)
//这将用于在启用批处理时创建批处理容器。
//选项:
//-DefaultBatchBuilder
//-KeyBasedBatchBuilder
BatcherBuilderType
//PartitionsAutoDiscoveryInterval是后台进程发现新分区的时间间隔
//默认值为1分钟
PartitionsAutoDiscoveryInterval time.Duration
//加密执行消息加密所需的字段
Encryption *ProducerEncryptionInfoConsumerOptions
//指定此消费者将订阅的主题。
//订阅时需要主题、主题列表或主题模式
Topic string
//指定此消费者将订阅的主题列表。
//订阅时需要主题、主题列表或主题模式
Topics []string
//指定正则表达式以订阅同一命名空间下的多个主题。
//订阅时需要主题、主题列表或主题模式
TopicsPattern string
//如果使用TopicsPattern,请指定轮询新分区或新主题的时间间隔。
AutoDiscoveryPeriod time.Duration
//指定此使用者的订阅名称
//订阅时需要此参数
SubscriptionName string
//将一组应用程序定义的属性附加到使用者
//这些属性将在主题统计中可见
Properties map[string]string
//选择订阅主题时要使用的订阅类型。
//默认为“独占”`
Type SubscriptionType
//订阅时光标将被设置的初始位置
//违约是最新的`
SubscriptionInitialPosition
//死信队列使用者策略的配置。
//在N次处理失败后,将消息路由到主题X
//默认情况下为零,没有DLQ
DLQ *DLQPolicy
//密钥共享使用者策略的配置。
KeySharedPolicy *KeySharedPolicy
//自动重试向默认填写的DLQPolicy主题发送消息
//默认值为false
RetryEnable bool
//为消费者设置“MessageChannel”
//当收到一条消息时,它将被推送到通道以供使用
MessageChannel chan ConsumerMessage
//设置使用者接收队列的大小。
//consumer receive queue(消费者接收队列)控制“消费者”在接收前可以累积多少条消息
//应用程序调用“消费者”。接收()。使用更高的价值可能会增加消费者
//以更大的内存利用率为代价的吞吐量。
//默认值是“1000”条消息,对于大多数用例应该是好的。
ReceiverQueueSize int
//延迟,在此延迟之后重新传递失败的消息
//已处理。默认值为1分钟。(参见'Consumer.Nack()')
NackRedeliveryDelay time.Duration
//设置消费者名称。
Name string
//如果启用,消费者将从压缩的主题中读取消息,而不是读取完整的消息积压
//关于这个话题。这意味着,如果主题被压缩,消费者只会看到主题的最新价值
//主题中的每个键,直到主题消息积压中压缩的点。除此之外
//此时,消息将正常发送。
//
//ReadCompacted只能用于订阅具有单个活动使用者(即。
//失败或独占订阅)。正在尝试在订阅非持久性主题或
//共享订阅将导致订阅调用引发PulsarClientException。
ReadCompacted bool
//将订阅标记为已复制,以使其在群集中保持同步
ReplicateSubscriptionState bool
//作为一个拦截器链,这些拦截器将在ConsumerInterceptor接口中定义的某些点被调用。
Interceptors ConsumerInterceptors
Schema Schema
//MaxReconnectToBroker设置重新连接Broker的最大重试次数。(默认值:ultimate)
MaxReconnectToBroker *uint
//用于解密加密消息的解密相关字段
Decryption *MessageDecryptionInfo