Skip to content

A process in the process pool was terminated abruptly while the future was running or pending. #8233

Open
@ghost

Description

❓ Questions and Help

I want to run pytorch xla on kaggle tpu v3-8 and use all core in tpu. But I always get A process in the process pool was terminated abruptly while the future was running or pending.

Source code:

%%bash
python3 -m pip install -U imageio[ffmpeg] controlnet-aux openmim
mim install mmengine mmcv mmdet mmpose
curl -O https://raw.githubusercontent.com/huggingface/controlnet_aux/master/src/controlnet_aux/dwpose/dwpose_config/dwpose-l_384x288.py
curl -O https://raw.githubusercontent.com/huggingface/controlnet_aux/master/src/controlnet_aux/dwpose/yolox_config/yolox_l_8xb8-300e_coco.py

import torch_xla, diffusers, builtins, imageio, os, PIL.Image, controlnet_aux, sys, torch
os.environ.pop('TPU_PROCESS_ADDRESSES')

reader = imageio.get_reader('/kaggle/input/controlnet/pose.mp4', 'ffmpeg')
openpose = controlnet_aux.DWposeDetector(det_config='yolox_l_8xb8-300e_coco.py', pose_config='dwpose-l_384x288.py')
poses = [openpose(PIL.Image.fromarray(reader.get_data(_)).resize((512, 768))) for _ in builtins.range(16)] #reader.count_frames()
length = builtins.len(poses) // 8
fps = reader.get_meta_data().get('fps')

def process(index):
    controlnet = diffusers.ControlNetModel.from_pretrained('lllyasviel/control_v11p_sd15_openpose', torch_dtype=torch.bfloat16)
    pipeline = diffusers.StableDiffusionControlNetPipeline.from_single_file('https://huggingface.co/chaowenguo/pal/blob/main/chilloutMix-Ni.safetensors', config='chaowenguo/stable-diffusion-v1-5', safety_checker=None, controlnet=controlnet, use_safetensors=True, torch_dtype=torch.bfloat16).to(torch_xla.core.xla_model.xla_device())
    pipeline.scheduler = diffusers.DPMSolverMultistepScheduler.from_config(pipeline.scheduler.config)
    pipeline.enable_attention_slicing()
    pipeline.unet.set_attn_processor(diffusers.pipelines.text_to_video_synthesis.pipeline_text_to_video_zero.CrossFrameAttnProcessor())
    pipeline.controlnet.set_attn_processor(diffusers.pipelines.text_to_video_synthesis.pipeline_text_to_video_zero.CrossFrameAttnProcessor())
    pose = sys.modules['__main__'].poses[index * sys.modules['__main__'].length:(index + 1) * sys.modules['__main__'].length]
    imageio.mimsave(f'{index}.mp4', pipeline(prompt=['gorgeous slim young cleavage robust boob japanese girl, wearing white deep V bandeau pantie, smile lying on white bed, best quality, extremely detailed'] * builtins.len(pose), negative_prompt=['monochrome, lowres, bad anatomy, worst quality, low quality'] * builtins.len(pose), image=pose, num_inference_steps=20, latents=torch.randn((1, 4, 96, 64), device=torch_xla.core.xla_model.xla_device(), dtype=torch.bfloat16).repeat(builtins.len(pose), 1, 1, 1)).images, fps=fps)

torch_xla.distributed.xla_multiprocessing.spawn(process, start_method='fork')

and get

BrokenProcessPool                         Traceback (most recent call last)
Cell In[2], line 20
     17     pose = sys.modules['__main__'].poses[index * sys.modules['__main__'].length:(index + 1) * sys.modules['__main__'].length]
     18     imageio.mimsave(f'{index}.mp4', pipeline(prompt=['gorgeous slim young cleavage robust boob japanese girl, wearing white deep V bandeau pantie, smile lying on white bed, best quality, extremely detailed'] * builtins.len(pose), negative_prompt=['monochrome, lowres, bad anatomy, worst quality, low quality'] * builtins.len(pose), image=pose, num_inference_steps=20, latents=torch.randn((1, 4, 96, 64), device=torch_xla.core.xla_model.xla_device(), dtype=torch.bfloat16).repeat(builtins.len(pose), 1, 1, 1)).images, fps=fps)
---> 20 torch_xla.distributed.xla_multiprocessing.spawn(process, start_method='fork')
     21 result = []
     22 for _ in builtins.range(8):

File /usr/local/lib/python3.10/site-packages/torch_xla/runtime.py:95, in requires_pjrt.<locals>.wrapper(*args, **kwargs)
     91 if not using_pjrt():
     92   raise NotImplementedError('`{}` not implemented for XRT'.format(
     93       fn.__name__))
---> 95 return fn(*args, **kwargs)

File /usr/local/lib/python3.10/site-packages/torch_xla/distributed/xla_multiprocessing.py:38, in spawn(fn, args, nprocs, join, daemon, start_method)
      6 @xr.requires_pjrt
      7 def spawn(fn,
      8           args=(),
   (...)
     11           daemon=False,
     12           start_method='spawn'):
     13   """Enables multi processing based replication.
     14 
     15   Args:
   (...)
     36     return None.
     37   """
---> 38   return pjrt.spawn(fn, nprocs, start_method, args)

File /usr/local/lib/python3.10/site-packages/torch_xla/_internal/pjrt.py:214, in spawn(fn, nprocs, start_method, args)
    211 elif nprocs is not None:
    212   logging.warning('Unsupported nprocs (%d), ignoring...' % nprocs)
--> 214 run_multiprocess(spawn_fn, start_method=start_method)

File /usr/local/lib/python3.10/site-packages/torch_xla/runtime.py:95, in requires_pjrt.<locals>.wrapper(*args, **kwargs)
     91 if not using_pjrt():
     92   raise NotImplementedError('`{}` not implemented for XRT'.format(
     93       fn.__name__))
---> 95 return fn(*args, **kwargs)

File /usr/local/lib/python3.10/site-packages/torch_xla/_internal/pjrt.py:174, in run_multiprocess(fn, start_method, *args, **kwargs)
    168   mp_fn = functools.partial(
    169       _run_thread_per_device,
    170       local_world_size=num_processes,
    171       fn=functools.partial(fn, *args, **kwargs),
    172       initializer_fn=initialize_multiprocess)
    173   process_results = executor.map(mp_fn, range(num_processes))
--> 174   replica_results = list(
    175       itertools.chain.from_iterable(
    176           result.items() for result in process_results))
    178 return _merge_replica_results(replica_results)

File /usr/local/lib/python3.10/site-packages/torch_xla/_internal/pjrt.py:175, in <genexpr>(.0)
    168   mp_fn = functools.partial(
    169       _run_thread_per_device,
    170       local_world_size=num_processes,
    171       fn=functools.partial(fn, *args, **kwargs),
    172       initializer_fn=initialize_multiprocess)
    173   process_results = executor.map(mp_fn, range(num_processes))
    174   replica_results = list(
--> 175       itertools.chain.from_iterable(
    176           result.items() for result in process_results))
    178 return _merge_replica_results(replica_results)

File /usr/local/lib/python3.10/concurrent/futures/process.py:575, in _chain_from_iterable_of_lists(iterable)
    569 def _chain_from_iterable_of_lists(iterable):
    570     """
    571     Specialized implementation of itertools.chain.from_iterable.
    572     Each item in *iterable* should be a list.  This function is
    573     careful not to keep references to yielded objects.
    574     """
--> 575     for element in iterable:
    576         element.reverse()
    577         while element:

File /usr/local/lib/python3.10/concurrent/futures/_base.py:621, in Executor.map.<locals>.result_iterator()
    618 while fs:
    619     # Careful not to keep a reference to the popped future
    620     if timeout is None:
--> 621         yield _result_or_cancel(fs.pop())
    622     else:
    623         yield _result_or_cancel(fs.pop(), end_time - time.monotonic())

File /usr/local/lib/python3.10/concurrent/futures/_base.py:319, in _result_or_cancel(***failed resolving arguments***)
    317 try:
    318     try:
--> 319         return fut.result(timeout)
    320     finally:
    321         fut.cancel()

File /usr/local/lib/python3.10/concurrent/futures/_base.py:458, in Future.result(self, timeout)
    456     raise CancelledError()
    457 elif self._state == FINISHED:
--> 458     return self.__get_result()
    459 else:
    460     raise TimeoutError()

File /usr/local/lib/python3.10/concurrent/futures/_base.py:403, in Future.__get_result(self)
    401 if self._exception:
    402     try:
--> 403         raise self._exception
    404     finally:
    405         # Break a reference cycle with the exception in self._exception
    406         self = None

BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

Single process on single tpu core works well but can not work multiple process and use all cores on tpu.

Please help.

You can copy and test my code in https://www.kaggle.com/code/chaowenguoback/stablediffusion. Please help

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions