Skip to content

Commit

Permalink
Able to fuse remote results.
Browse files Browse the repository at this point in the history
  • Loading branch information
xuhao1 committed Oct 29, 2022
1 parent 4f0d49c commit ffa17ec
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 7 deletions.
16 changes: 13 additions & 3 deletions scripts/taichislam_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from taichi_slam.mapping import *
from taichi_slam.utils.visualization import *
from taichi_slam.utils.ros_pcl_transfer import *
from taichi_slam.utils.communication import SLAMComm
from taichi_slam.utils.communication import *
import numpy as np
import rospy
from geometry_msgs.msg import PoseStamped
Expand Down Expand Up @@ -80,13 +80,17 @@ def init_params(self):
self.max_mesh = rospy.get_param('~disp/max_mesh', 1000000)

def send_submap_handle(self, buf):
self.comm.publishBuffer(buf)
self.comm.publishBuffer(buf, CHANNEL_SUBMAP)

def traj_send_handle(self, traj):
self.comm.publishBuffer(traj, CHANNEL_TRAJ)

def initial_networking(self):
if not self.enable_multi:
return
self.comm = SLAMComm(self.drone_id)
self.comm.on_submap = self.on_remote_submap
self.comm.on_traj = self.on_remote_traj

def handle_comm(self):
if not self.enable_multi:
Expand All @@ -95,6 +99,9 @@ def handle_comm(self):

def on_remote_submap(self, buf):
self.mapping.input_remote_submap(buf)

def on_remote_traj(self, buf):
self.mapping.input_remote_traj(buf)

def init_subscribers(self):
self.depth_sub = message_filters.Subscriber('~depth', Image, queue_size=10)
Expand Down Expand Up @@ -188,6 +195,7 @@ def initial_mapping(self):
if self.enable_mesher:
self.mesher = MarchingCubeMesher(self.mapping.submap_collection, self.max_mesh, tsdf_surface_thres=self.voxel_size*5)
self.mapping.map_send_handle = self.send_submap_handle
self.mapping.traj_send_handle = self.traj_send_handle
else:
if self.mapping_type == "octo":
opts = self.get_octo_opts()
Expand Down Expand Up @@ -348,9 +356,11 @@ def process_taichi(self):
self.render.set_drone_pose(0, pose[0], pose[1])
t_mesh, t_export, t_pubros = self.output(pose[0], pose[1])
self.count += 1
print(f"Time: pcl2npy {t_pcl2npy:.1f}ms t_recast {t_recast:.1f}ms ms t_export {t_export:.1f}ms t_mesh {t_mesh:.1f}ms t_pubros {t_pubros:.1f}ms")
print(f"[TaichiSLAM] Time: pcl2npy {t_pcl2npy:.1f}ms t_recast {t_recast:.1f}ms ms t_export {t_export:.1f}ms t_mesh {t_mesh:.1f}ms t_pubros {t_pubros:.1f}ms")

def traj_callback(self, traj):
if traj.drone_id != self.drone_id:
return
frame_poses = {}
positions = np.zeros((len(traj.poses), 3))
for i in range(len(traj.frame_ids)):
Expand Down
26 changes: 23 additions & 3 deletions taichi_slam/mapping/submap_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,23 @@ def set_export_submap(self, new_submap):
else:
self.export_x = new_submap.export_x

def set_frame_poses(self, frame_poses):
def set_frame_poses(self, frame_poses, from_remote=False):
s = time.time()
self.pgo_poses.update(frame_poses)
used_poses = {}
for frame_id in frame_poses:
if (self.last_frame_id is None or frame_id > self.last_frame_id) and frame_id in self.ego_motion_poses:
self.last_frame_id = frame_id
if frame_id in self.submaps:
R = frame_poses[frame_id][0]
T = frame_poses[frame_id][1]
self.global_map.set_base_pose_submap(self.submaps[frame_id], R, T)
used_poses[frame_id] = frame_poses[frame_id]
print(f"[SubmapMapping] Update frame poses from PGO cost {(time.time() - s)*1000:.1f}ms")

#We need to broadcast the poses to remote
if not from_remote:
self.send_traj(used_poses)

def create_new_submap(self, frame_id, R, T):
if self.first_init:
self.first_init = False
Expand Down Expand Up @@ -188,7 +193,15 @@ def send_submap(self, submap):
s = time.time()
compressed = zlib.compress(f.getbuffer(), level=1)
self.map_send_handle(compressed)
print(f"[SubmapMapping] Send submap with {len(f.getbuffer())/1024:.1f} kB, compressed {len(compressed)/1024:.1f}kB compress cost {(time.time() - s)*1000:.1f}ms")
print(f"[SubmapMapping] Send submap with {len(f.getbuffer())/1024.0:.1f} kB, compressed {len(compressed)/1024:.1f}kB compress cost {(time.time() - s)*1000:.1f}ms")

def send_traj(self, traj):
f = io.BytesIO()
np.save(f, traj)
s = time.time()
compressed = zlib.compress(f.getbuffer(), level=1)
self.traj_send_handle(compressed)
print(f"[SubmapMapping] Send traj with {len(f.getbuffer())/1024.0:.1f} kB, compressed {len(compressed)/1024:.1f}kB compress cost {(time.time() - s)*1000:.1f}ms")

def input_remote_submap(self, buf):
print(f"[SubmapMapping] Recv submap with {len(buf)/1024:.1f} kB")
Expand All @@ -201,6 +214,13 @@ def input_remote_submap(self, buf):
self.local_to_global()
self.submaps[submap["frame_id"]] = idx

def input_remote_traj(self, buf):
#decompress
decompress = zlib.decompress(buf)
f = io.BytesIO(decompress)
traj = np.load(f, allow_pickle=True).item()
self.set_frame_poses(traj, True)
print(f"[SubmapMapping] Recv traj with {len(traj)} poses {len(buf)/1024.0:.1f} kB")

def saveMap(self, filename):
self.global_map.saveMap(filename)
Expand Down
11 changes: 10 additions & 1 deletion taichi_slam/utils/communication.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import random

CHANNEL_SUBMAP = "SUBMAP_CHANNEL"
CHANNEL_TRAJ = "TRAJ_CHANNEL"
TIMEOUT_MS = 10

class SLAMComm:
Expand All @@ -11,6 +12,7 @@ def __init__(self, drone_id = 0, lcm_url="udpm://224.0.0.251:7667?ttl=1"):
self.drone_id = drone_id
self.sent_msgs = set()
self.submap_sub = self.lcm.subscribe(CHANNEL_SUBMAP, self.handle_submap)
self.traj_sub = self.lcm.subscribe(CHANNEL_TRAJ, self.handle_traj)

def publishBuffer(self, buf, channel=CHANNEL_SUBMAP):
msg = Buffer()
Expand All @@ -22,13 +24,20 @@ def publishBuffer(self, buf, channel=CHANNEL_SUBMAP):
self.sent_msgs.add(msg.msg_id)
self.lcm.publish(channel, msg.encode())
print(f"Sent message on channel {channel} msg_id {msg.msg_id} size {len(msg.buffer)/1024:.1f} KB")

def handle_submap(self, channel, data):
msg = Buffer.decode(data)
if msg.msg_id in self.sent_msgs:
return
print(f"Received message on channel {channel} msg_id {msg.msg_id}")
self.on_submap(msg.buffer)

def handle_traj(self, channel, data):
msg = Buffer.decode(data)
if msg.msg_id in self.sent_msgs:
return
print(f"Received message on channel {channel} msg_id {msg.msg_id}")
self.on_traj(msg.buffer)

def handle(self):
self.lcm.handle_timeout(TIMEOUT_MS)

0 comments on commit ffa17ec

Please sign in to comment.