@@ -30,7 +30,7 @@ using namespace std;
3030const char *VERSION =
3131 " PYTHON_CLIENT_VERSION: " PYTHON_CLIENT_VERSION " , BUILD DATE: " PYCLI_BUILD_DATE " " ;
3232
33- map<CPushConsumer *, PyObject *> g_CallBackMap;
33+ map<CPushConsumer *, pair< PyObject *, object> > g_CallBackMap;
3434
3535class PyThreadStateLock {
3636public:
@@ -163,7 +163,14 @@ void *PyCreatePushConsumer(const char *groupId) {
163163 return (void *) CreatePushConsumer (groupId);
164164}
165165int PyDestroyPushConsumer (void *consumer) {
166- return DestroyPushConsumer ((CPushConsumer *) consumer);
166+ CPushConsumer *consumerInner = (CPushConsumer *) consumer;
167+ map<CPushConsumer *, pair<PyObject *, object>>::iterator iter;
168+ iter = g_CallBackMap.find (consumerInner);
169+ if (iter != g_CallBackMap.end ()) {
170+ UnregisterMessageCallback (consumerInner);
171+ g_CallBackMap.erase (iter);
172+ }
173+ return DestroyPushConsumer (consumerInner);
167174}
168175int PyStartPushConsumer (void *consumer) {
169176 return StartPushConsumer ((CPushConsumer *) consumer);
@@ -181,29 +188,27 @@ int PySetPushConsumerNameServerDomain(void *consumer, const char *domain){
181188int PySubscribe (void *consumer, const char *topic, const char *expression) {
182189 return Subscribe ((CPushConsumer *) consumer, topic, expression);
183190}
184- int PyRegisterMessageCallback (void *consumer, PyObject *pCallback) {
191+ int PyRegisterMessageCallback (void *consumer, PyObject *pCallback, object args ) {
185192 CPushConsumer *consumerInner = (CPushConsumer *) consumer;
186- g_CallBackMap[consumerInner] = pCallback;
193+ g_CallBackMap[consumerInner] = make_pair ( pCallback, std::move (args)) ;
187194 return RegisterMessageCallback (consumerInner, &PythonMessageCallBackInner);
188195}
189196
190197int PythonMessageCallBackInner (CPushConsumer *consumer, CMessageExt *msg) {
191-
192- class PyThreadStateLock PyThreadLock;
193- PyMessageExt message;
194- message.pMessageExt = msg;
195- map<CPushConsumer *, PyObject *>::iterator iter;
198+ PyThreadStateLock PyThreadLock; // ensure hold GIL, before call python callback
199+ PyMessageExt message = { .pMessageExt = msg };
200+ map<CPushConsumer *, pair<PyObject *, object>>::iterator iter;
196201 iter = g_CallBackMap.find (consumer);
197202 if (iter != g_CallBackMap.end ()) {
198- PyObject * pCallback = iter->second ;
203+ pair<PyObject *, object> callback = iter->second ;
204+ PyObject * pCallback = callback.first ;
205+ object& args = callback.second ;
199206 if (pCallback != NULL ) {
200- int status =
201- boost::python::call<int >(pCallback, message);
207+ int status = boost::python::call<int >(pCallback, message, args);
202208 return status;
203209 }
204210 }
205211 return 1 ;
206-
207212}
208213
209214int PySetPushConsumerThreadCount (void *consumer, int threadCount) {
0 commit comments