-
Notifications
You must be signed in to change notification settings - Fork 166
Expand file tree
/
Copy pathMQClientFactory.h
More file actions
217 lines (185 loc) · 9.4 KB
/
MQClientFactory.h
File metadata and controls
217 lines (185 loc) · 9.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef __MQCLIENTFACTORY_H__
#define __MQCLIENTFACTORY_H__
#include <boost/asio.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/thread/recursive_mutex.hpp>
#include <boost/thread/thread.hpp>
#include "FindBrokerResult.h"
#include "MQClientAPIImpl.h"
#include "MQClientException.h"
#include "MQConsumer.h"
#include "MQDecoder.h"
#include "MQMessageQueue.h"
#include "MQProducer.h"
#include "PermName.h"
#include "QueryResult.h"
#include "ServiceState.h"
#include "SocketUtil.h"
#include "TopicConfig.h"
#include "TopicRouteData.h"
namespace rocketmq {
//<!************************************************************************
class TopicPublishInfo;
class MQClientFactory {
public:
MQClientFactory(const string& clientID,
int pullThreadNum,
uint64_t tcpConnectTimeout,
uint64_t tcpTransportTryLockTimeout,
string unitName);
virtual ~MQClientFactory();
void start();
void shutdown();
bool registerProducer(MQProducer* pProducer);
void unregisterProducer(MQProducer* pProducer);
bool registerConsumer(MQConsumer* pConsumer);
void unregisterConsumer(MQConsumer* pConsumer);
void createTopic(const string& key,
const string& newTopic,
int queueNum,
const SessionCredentials& session_credentials);
int64 minOffset(const MQMessageQueue& mq, const SessionCredentials& session_credentials);
int64 maxOffset(const MQMessageQueue& mq, const SessionCredentials& session_credentials);
int64 searchOffset(const MQMessageQueue& mq, int64 timestamp, const SessionCredentials& session_credentials);
int64 earliestMsgStoreTime(const MQMessageQueue& mq, const SessionCredentials& session_credentials);
MQMessageExt* viewMessage(const string& msgId, const SessionCredentials& session_credentials);
QueryResult queryMessage(const string& topic,
const string& key,
int maxNum,
int64 begin,
int64 end,
const SessionCredentials& session_credentials);
void endTransactionOneway(const MQMessageQueue& mq,
EndTransactionRequestHeader* requestHeader,
const SessionCredentials& sessionCredentials);
void checkTransactionState(const std::string& addr,
const MQMessageExt& message,
const CheckTransactionStateRequestHeader& checkRequestHeader);
MQClientAPIImpl* getMQClientAPIImpl() const;
MQProducer* selectProducer(const string& group);
MQConsumer* selectConsumer(const string& group);
boost::shared_ptr<TopicPublishInfo> topicRouteData2TopicPublishInfo(const string& topic, TopicRouteData* pRoute);
void topicRouteData2TopicSubscribeInfo(const string& topic, TopicRouteData* pRoute, vector<MQMessageQueue>& mqs);
FindBrokerResult* findBrokerAddressInSubscribe(const string& brokerName, int brokerId, bool onlyThisBroker);
FindBrokerResult* findBrokerAddressInAdmin(const string& brokerName);
string findBrokerAddressInPublish(const string& brokerName);
boost::shared_ptr<TopicPublishInfo> tryToFindTopicPublishInfo(const string& topic,
const SessionCredentials& session_credentials);
void fetchSubscribeMessageQueues(const string& topic,
vector<MQMessageQueue>& mqs,
const SessionCredentials& session_credentials);
bool updateTopicRouteInfoFromNameServer(const string& topic,
const SessionCredentials& session_credentials,
bool isDefault = false);
void rebalanceImmediately();
void doRebalanceByConsumerGroup(const string& consumerGroup);
void sendHeartbeatToAllBroker();
void cleanOfflineBrokers();
void findConsumerIds(const string& topic,
const string& group,
vector<string>& cids,
const SessionCredentials& session_credentials);
void resetOffset(const string& group, const string& topic, const map<MQMessageQueue, int64>& offsetTable);
ConsumerRunningInfo* consumerRunningInfo(const string& consumerGroup);
bool getSessionCredentialFromConsumer(const string& consumerGroup, SessionCredentials& sessionCredentials);
void addBrokerToAddrMap(const string& brokerName, map<int, string>& brokerAddrs);
map<string, map<int, string>> getBrokerAddrMap();
void clearBrokerAddrMap();
bool isBrokerAddressInUse(const std::string& address);
private:
void unregisterClient(const string& producerGroup,
const string& consumerGroup,
const SessionCredentials& session_credentials);
TopicRouteData* getTopicRouteData(const string& topic);
void addTopicRouteData(const string& topic, TopicRouteData* pTopicRouteData);
HeartbeatData* prepareHeartbeatData();
void startScheduledTask(bool startFetchNSService = true);
//<!timer async callback
void fetchNameServerAddr(boost::system::error_code& ec, boost::shared_ptr<boost::asio::deadline_timer> t);
void updateTopicRouteInfo(boost::system::error_code& ec, boost::shared_ptr<boost::asio::deadline_timer> t);
void timerCB_sendHeartbeatToAllBroker(boost::system::error_code& ec,
boost::shared_ptr<boost::asio::deadline_timer> t);
void timerCB_cleanOfflineBrokers(boost::system::error_code& ec, boost::shared_ptr<boost::asio::deadline_timer> t);
// consumer related operation
void consumer_timerOperation();
void persistAllConsumerOffset(boost::system::error_code& ec, boost::shared_ptr<boost::asio::deadline_timer> t);
void doRebalance();
void timerCB_doRebalance(boost::system::error_code& ec, boost::shared_ptr<boost::asio::deadline_timer> t);
bool getSessionCredentialFromConsumerTable(SessionCredentials& sessionCredentials);
bool addConsumerToTable(const string& consumerName, MQConsumer* pMQConsumer);
void eraseConsumerFromTable(const string& consumerName);
int getConsumerTableSize();
void getTopicListFromConsumerSubscription(set<string>& topicList);
void updateConsumerSubscribeTopicInfo(const string& topic, vector<MQMessageQueue> mqs);
void insertConsumerInfoToHeartBeatData(HeartbeatData* pHeartbeatData);
// producer related operation
bool getSessionCredentialFromProducerTable(SessionCredentials& sessionCredentials);
bool addProducerToTable(const string& producerName, MQProducer* pMQProducer);
void eraseProducerFromTable(const string& producerName);
int getProducerTableSize();
void insertProducerInfoToHeartBeatData(HeartbeatData* pHeartbeatData);
// topicPublishInfo related operation
void addTopicInfoToTable(const string& topic, boost::shared_ptr<TopicPublishInfo> pTopicPublishInfo);
void eraseTopicInfoFromTable(const string& topic);
bool isTopicInfoValidInTable(const string& topic);
boost::shared_ptr<TopicPublishInfo> getTopicPublishInfoFromTable(const string& topic);
void getTopicListFromTopicPublishInfo(set<string>& topicList);
void getSessionCredentialsFromOneOfProducerOrConsumer(SessionCredentials& session_credentials);
protected:
string m_clientId;
unique_ptr<MQClientAPIImpl> m_pClientAPIImpl;
unique_ptr<ClientRemotingProcessor> m_pClientRemotingProcessor;
private:
string m_nameSrvDomain; // per clientId
ServiceState m_serviceState;
bool m_bFetchNSService;
//<! group --> MQProducer;
typedef map<string, MQProducer*> MQPMAP;
boost::mutex m_producerTableMutex;
MQPMAP m_producerTable;
//<! group --> MQConsumer;
typedef map<string, MQConsumer*> MQCMAP;
// Changed to recursive mutex due to avoid deadlock issue:
boost::recursive_mutex m_consumerTableMutex;
MQCMAP m_consumerTable;
//<! Topic---> TopicRouteData
typedef map<string, TopicRouteData*> TRDMAP;
boost::mutex m_topicRouteTableMutex;
TRDMAP m_topicRouteTable;
//<!-----brokerName
//<! ------brokerid;
//<! ------add;
boost::mutex m_brokerAddrlock;
typedef map<string, map<int, string>> BrokerAddrMAP;
BrokerAddrMAP m_brokerAddrTable;
//<!topic ---->TopicPublishInfo> ;
typedef map<string, boost::shared_ptr<TopicPublishInfo>> TPMap;
boost::mutex m_topicPublishInfoTableMutex;
TPMap m_topicPublishInfoTable;
boost::mutex m_factoryLock;
boost::mutex m_topicPublishInfoLock;
boost::asio::io_service m_async_ioService;
unique_ptr<boost::thread> m_async_service_thread;
boost::asio::io_service m_consumer_async_ioService;
unique_ptr<boost::thread> m_consumer_async_service_thread;
};
} // namespace rocketmq
#endif