2025/8/4大约 1 分钟
生产者发送流程
类继承图

public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
this.namespace = namespace;
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}
输入三个参数:
- 指定命名空间,默认是空
- 指定生产者组,即topic
- 指定rpcHook,在每条消息发生之前和之后会调用,做权限控制。流量统计。。。
DefaultMQProducerImpl
由上面源码可见,DefaultMQProducer构造的时候,创建了一个DefaultMQProducerImpl,实际发送消息的过程委托给这个DefaultMQProducerImpl来发送。

启动生产者DefaultMQProducer
/**
* Start this producer instance. </p>
*
* <strong> Much internal initializing procedures are carried out to make this instance prepared, thus, it's a must
* to invoke this method before sending or querying messages. </strong> </p>
*
* @throws MQClientException if there is any unexpected error.
*/
@Override
public void start() throws MQClientException {
// 设置topic
this.setProducerGroup(withNamespace(this.producerGroup));
// 启动内置的生产者实现类
this.defaultMQProducerImpl.start();
if (null != traceDispatcher) {
try {
// trace 相关
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
1. 启动defaultMQProducerImpl
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// 配置检查
this.checkConfig();
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
// MQClientInstance 他妈的叫个factory,你叫个 instance不好吗??
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
if (startFactory) {
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.startScheduledTask();
}