|
42 | 42 | socket.send_string("World!") |
43 | 43 |
|
44 | 44 | ### 1.2. Publisher-Subscriber(发布-订阅模式) |
| 45 | +* Publisher-Subscriber模式,消息是单向流动的,发布者只能发布消息,不能接受消息;订阅者只能接受消息,不能发送消息。 |
| 46 | +* 服务端发布消息过程中,如果有订阅者退出,不影响发布者继续发布消息,当订阅者再次连接上来,收到的消息是后来发布的消息。 |
| 47 | +* 比较晚加入的订阅者,或者中途离开的订阅者,必然会丢掉一部分信息。 |
| 48 | +* 如果发布者停止,所有订阅者会阻塞,等发布者再次上线的时候会继续接受消息。 |
| 49 | +* 订阅者必须使用zmq_setsockopt()方法来设置订阅的内容,否则将收不到任何消息。 |
| 50 | +* “慢连接”:我们不知道订阅者是何时开始接收消息的,就算先启动“订阅者”,再启动“发布者”,“订阅者”还是会缺失一部分消息,**因为建立连接是需要时间的,虽然很短,但不是零。ZMQ在后台是进行异步的IO传输,在建立TCP连接的短短的时间段内,ZMQ就可以发送很多消息了。** |
| 51 | +* 有种简单的方法来同步“发布者”和“订阅者”,通过sleep让发布者延迟发送消息,等连接建立完成后再进行发送。 |
| 52 | + |
| 53 | +#### Publisher.py |
| 54 | + |
| 55 | + import zmq |
| 56 | + import time |
| 57 | + import random |
| 58 | + |
| 59 | + context = zmq.Context() |
| 60 | + socket = context.socket(zmq.PUB) |
| 61 | + socket.bind("tcp://*:5555") |
| 62 | + |
| 63 | + if __name__ == '__main__': |
| 64 | + print("发布者启动.....") |
| 65 | + time.sleep(2) |
| 66 | + for i in range(1000): |
| 67 | + tempterature = random.randint(-10, 40) |
| 68 | + message = "我是publisher, 这是我发布给你们的第{}个消息!今日温度{}".format(i+1, tempterature) |
| 69 | + socket.send_string(message) |
| 70 | + |
| 71 | +#### Subscriber.py |
| 72 | + |
| 73 | + import zmq |
| 74 | + |
| 75 | + context = zmq.Context() |
| 76 | + socket = context.socket(zmq.SUB) |
| 77 | + socket.connect("tcp://localhost:5555") |
| 78 | + |
| 79 | + # 客户端需要设定一个过滤,否则收不到任何信息 |
| 80 | + socket.setsockopt_string(zmq.SUBSCRIBE, '') |
| 81 | + |
| 82 | + if __name__ == '__main__': |
| 83 | + print('订阅者一号启动....') |
| 84 | + while True: |
| 85 | + message = socket.recv_string() |
| 86 | + print("(订阅者一号)接收到'发布者'发送的消息:{}".format(message)) |
0 commit comments