Skip to content

Commit c31733c

Browse files
committed
add args field for python callback
1 parent 7bd61c5 commit c31733c

7 files changed

Lines changed: 53 additions & 35 deletions

File tree

.gitignore

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,8 @@
11
.idea/
2-
cmake-build-debug/
2+
cmake-build-*/
3+
4+
*.pyc
5+
*.so
6+
7+
bin/
8+

doc/Introduction.md

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,10 @@
7373
----------
7474
## How to use
7575
- set LD_LIBRARY_PATH
76-
``````
76+
```
7777
export LD_LIBRARY_PATH=/usr/local/lib
78+
```
79+
7880
- import module
7981
```
8082
from librocketmqclientpython import *
@@ -90,28 +92,28 @@
9092
- producer must invoke following interface:
9193
```
9294
- producer = CreateProducer("please_rename_unique_group_name");
93-
- SetProducerNameServerAddress(producer,"please_rename_unique_name_server")
95+
- SetProducerNameServerAddress(producer, "please_rename_unique_name_server")
9496
- StartProducer(producer)
95-
- SendMessageSync(producer,msg)
97+
- SendMessageSync(producer, msg)
9698
- ShutdownProducer(producer)
9799
- DestroyProducer(producer)
98100
```
99101
- how to consumer messages
100102
```
101-
- def consumerMessage(msg):
102-
- topic = GetMessageTopic(msg)
103-
- body = GetMessageBody(msg)
104-
- tags = GetMessageTags(msg)
105-
- msgid = GetMessageId(msg)
106-
- handle message
107-
- return 0
103+
- def consumerMessage(msg, args):
104+
- topic = GetMessageTopic(msg)
105+
- body = GetMessageBody(msg)
106+
- tags = GetMessageTags(msg)
107+
- msgid = GetMessageId(msg)
108+
- # handle message...
109+
- return 0
108110
```
109111
- pushconsumer must invoke following interface:
110112
```
111113
- consumer = CreatePushConsumer("please_rename_unique_group_name_1");
112-
- SetPushConsumerNameServerAddress(consumer,"please_rename_unique_name_server")
114+
- SetPushConsumerNameServerAddress(consumer, "please_rename_unique_name_server")
113115
- Subscribe(consumer, "your_topic", "*")
114-
- RegisterMessageCallback(consumer,consumerMessage)
116+
- RegisterMessageCallback(consumer, consumerMessage, args)
115117
- StartPushConsumer(consumer)
116118
- ShutdownPushConsumer(consumer)
117119
- DestroyPushConsumer(consumer)
@@ -122,3 +124,4 @@
122124
- python testProducer.py
123125
- push consumer
124126
- python testConsumer.py
127+

doc/api-doc/consumer-push.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,14 @@
3131
topic: topic name
3232
tag: topic tag
3333

34-
* RegisterMessageCallback(consumer, pyCallBack) <br />
34+
* RegisterMessageCallback(consumer, pyCallBack, pyArgs) <br />
3535
- function description<br />
3636
set callback for push consumer instance <br />
3737

3838
- input <br />
3939
consumer: consumer intance<br />
40-
pyCallBack: py callback method. when message pulled, they would be send to a pyCallback method
40+
pyCallBack: py callback method. when message pulled, they would be send to a pyCallback method<br />
41+
pyArgs: the arguments will be passed to pyCallBack
4142

4243
* SetPushConsumerThreadCount(consumer, threadCount)
4344
- function description<br />

sample/testConsumer.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818
import base
1919
import time
2020
from librocketmqclientpython import *
21+
2122
totalMsg = 0
22-
def consumerMessage(msg):
23+
24+
def consumerMessage(msg, args):
2325
global totalMsg
2426
totalMsg += 1
2527
print(">>ConsumerMessage Called:",totalMsg)
@@ -33,11 +35,12 @@ def consumerMessage(msg):
3335

3436
consumer = CreatePushConsumer("awtTest_Producer_Python_Test")
3537
print(consumer)
36-
SetPushConsumerNameServerAddress(consumer,"172.17.0.2:9876")
37-
SetPushConsumerThreadCount(consumer,1)
38+
SetPushConsumerNameServerAddress(consumer, "172.17.0.2:9876")
39+
SetPushConsumerThreadCount(consumer, 1)
3840
Subscribe(consumer, "T_TestTopic", "*")
39-
RegisterMessageCallback(consumer,consumerMessage)
41+
RegisterMessageCallback(consumer, consumerMessage, None)
4042
StartPushConsumer(consumer)
43+
4144
i = 1
4245
while i <= 60:
4346
print(i)

src/PythonWrapper.cpp

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ using namespace std;
3030
const char *VERSION =
3131
"PYTHON_CLIENT_VERSION: " PYTHON_CLIENT_VERSION ", BUILD DATE: " PYCLI_BUILD_DATE " ";
3232

33-
map<CPushConsumer *, PyObject *> g_CallBackMap;
33+
map<CPushConsumer *, pair<PyObject *, object>> g_CallBackMap;
3434

3535
class PyThreadStateLock {
3636
public:
@@ -163,7 +163,14 @@ void *PyCreatePushConsumer(const char *groupId) {
163163
return (void *) CreatePushConsumer(groupId);
164164
}
165165
int PyDestroyPushConsumer(void *consumer) {
166-
return DestroyPushConsumer((CPushConsumer *) consumer);
166+
CPushConsumer *consumerInner = (CPushConsumer *) consumer;
167+
map<CPushConsumer *, pair<PyObject *, object>>::iterator iter;
168+
iter = g_CallBackMap.find(consumerInner);
169+
if (iter != g_CallBackMap.end()) {
170+
UnregisterMessageCallback(consumerInner);
171+
g_CallBackMap.erase(iter);
172+
}
173+
return DestroyPushConsumer(consumerInner);
167174
}
168175
int PyStartPushConsumer(void *consumer) {
169176
return StartPushConsumer((CPushConsumer *) consumer);
@@ -181,29 +188,27 @@ int PySetPushConsumerNameServerDomain(void *consumer, const char *domain){
181188
int PySubscribe(void *consumer, const char *topic, const char *expression) {
182189
return Subscribe((CPushConsumer *) consumer, topic, expression);
183190
}
184-
int PyRegisterMessageCallback(void *consumer, PyObject *pCallback) {
191+
int PyRegisterMessageCallback(void *consumer, PyObject *pCallback, object args) {
185192
CPushConsumer *consumerInner = (CPushConsumer *) consumer;
186-
g_CallBackMap[consumerInner] = pCallback;
193+
g_CallBackMap[consumerInner] = make_pair(pCallback, std::move(args));
187194
return RegisterMessageCallback(consumerInner, &PythonMessageCallBackInner);
188195
}
189196

190197
int PythonMessageCallBackInner(CPushConsumer *consumer, CMessageExt *msg) {
191-
192-
class PyThreadStateLock PyThreadLock;
193-
PyMessageExt message;
194-
message.pMessageExt = msg;
195-
map<CPushConsumer *, PyObject *>::iterator iter;
198+
PyThreadStateLock PyThreadLock; // ensure hold GIL, before call python callback
199+
PyMessageExt message = { .pMessageExt = msg };
200+
map<CPushConsumer *, pair<PyObject *, object>>::iterator iter;
196201
iter = g_CallBackMap.find(consumer);
197202
if (iter != g_CallBackMap.end()) {
198-
PyObject * pCallback = iter->second;
203+
pair<PyObject *, object> callback = iter->second;
204+
PyObject * pCallback = callback.first;
205+
object& args = callback.second;
199206
if (pCallback != NULL) {
200-
int status =
201-
boost::python::call<int>(pCallback, message);
207+
int status = boost::python::call<int>(pCallback, message, args);
202208
return status;
203209
}
204210
}
205211
return 1;
206-
207212
}
208213

209214
int PySetPushConsumerThreadCount(void *consumer, int threadCount) {

src/PythonWrapper.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ int PyShutdownPushConsumer(void *consumer);
8989
int PySetPushConsumerNameServerAddress(void *consumer, const char *namesrv);
9090
int PySetPushConsumerNameServerDomain(void *consumer, const char *domain);
9191
int PySubscribe(void *consumer, const char *topic, const char *expression);
92-
int PyRegisterMessageCallback(void *consumer, PyObject *pCallback);
92+
int PyRegisterMessageCallback(void *consumer, PyObject *pCallback, object args);
9393
int PythonMessageCallBackInner(CPushConsumer *consumer, CMessageExt *msg);
9494
int PySetPushConsumerThreadCount(void *consumer, int threadCount);
9595
int PySetPushConsumerMessageBatchMaxSize(void *consumer, int batchSize);

test/TestConsumeMessages.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ def sigint_handler(signum, frame):
3434
sys.exit(0)
3535

3636

37-
def consumer_message(msg):
37+
def consumer_message(msg, args):
3838
global totalMsg
3939
totalMsg += 1
4040
print 'total count %d' % totalMsg
@@ -55,7 +55,7 @@ def init_producer(_group, _topic, _tag):
5555
SetPushConsumerNameServerAddress(consumer, name_srv)
5656
SetPushConsumerThreadCount(consumer, 1)
5757
Subscribe(consumer, _topic, _tag)
58-
RegisterMessageCallback(consumer, consumerMessage)
58+
RegisterMessageCallback(consumer, consumer_message, None)
5959
StartPushConsumer(consumer)
6060
print 'consumer is ready...'
6161
return consumer

0 commit comments

Comments
 (0)