Jacob Tomlinson's profile picture Jacob Tomlinson
Home Blog Talks Newsletter About

Debugging Data Science workflows at scale

15 minute read #python, #dask, #kubernetes, #apache-beam, #google-cloud, #google-kubernetes-engine

The more we scale up our workloads the more we run into bugs that only appear at scale. Reproducing these bugs can be expensive, time consuming and error prone. In order to report a bug on a GitHub repo you generally need to isolate the bug and come up with a minimal reproducer so that the maintainer can investigate. But what if a minimal reproducer requires hundreds of servers to isolate and replicate?

This week I spent some time debugging a problem that was discovered when running Apache Beam, on a Dask cluster with hundreds of GPUs, on an batch compute cluster at NVIDIA. I thought it might be interesting to write up my debugging process for looking into bugs at this scale.

The problem

This bug came to my attention when some Data Scientists approached me and asked why Dask was failing to scale into the hundreds of workers. They reported that when scaling to hundreds of GPUs they could see in the telemetry that many GPUs were sitting idle.

They had spent some time digging into the problem and found that jobs with 10, 50 or 100 GPU workers would fully utilize the cluster. However when they went to 200 workers and beyond they would lose performance and many of them would sit idle and the Dask dashboard showed no tasks being assigned to those workers.

Collecting information

The first things I needed to figure out were what did the workflow look like and what software, tools and platforms were being used. They had approached me as a Dask maintainer because they could see the problem in the HPC telemetry and the Dask dashboard, but it wasn’t clear whether Dask was causing the problem.

I discovered that the workflow was using Apache Beam with the Dask Runner. The Dask cluster was being created as an HPC-style batch job on a large on-prem compute cluster. The workflow read in some files from shared storage and used Beam to map functions onto those files and write the output back to shared storage.

They had very helpfully put together a simplified version of their workload which created some temp files (one per worker), created a collection with beam.Create and performa and element-wise operation onto it with beam.ParDo. This function read in the input file, used torch to simulate some GPU intensive compute for a few seconds and then wrote the same file back to disk. They had run this simplified workload a few times and had seen the same behavior of idle workers at large scale.

This was a great start because they had recreated the same IO, compute and scaling setup they were using in their project, but they had removed all of the business logic. This made it much easier for me to dig into solving the problem without having to understand their project.

Isolating variables

My next steps were to continue boiling their reproducer down as much as possible by isolating the different components and removing them one by one.

I wanted to remove GPUs from the equation if possible. Having workers sitting idle suggested a graph problem or a scheduling problem and that the compute being done was probably unrelated. Removing torch from the reproducer would mean that my workers could be much smaller and cheaper.

It was unclear to me whether PyTorch was part of the stack they were using in their real workload or whether they were just using it to highlight which workers had 100% GPU utilization and which had 0%. So my first step was to replace the ParDo operation with one that just sleeps, this removed torch from the dependencies altogether. I was confident that I could spot idle workers another way.

I removed all of the file generation and IO operations from the example as I didn’t think that would be causing worker idleness. This was just a gut feeling more than anything and I was totally prepared to add this back in.

My plan was to run a Dask cluster and then use Apache Beam to map a no-op function over a list of integers at a large scale and see if I saw the same behavior. So the next thing I needed to isolate was the compute environment.

I wanted to avoid opening an issue on Dask or Beam (the two most likely candidates for the bug) and say that I had a problem that I could only reproduce on an on-prem cluster that the other maintainers of those projects didn’t have access to.

Cost effective scale

I needed a Dask cluster with a few hundred workers to try and replicate this. The first thing I tried was dask.distributed.LocalCuster(n_workers=250) but I quickly ran into problems with the workers and scheduler timing out and losing connection to each other. My local machine was overwhelmed with the overhead of such a large cluster before even trying to run anything.

So I turned to public cloud to run a large cluster for a few hours while digging into the problem.

I chose Google Cloud to run this workload because I had access already configured on my laptop and I knew that if the reproducer needed large scale it was common enough that other folks would be able to reproduce my findings.

I also chose Google Kubernetes Engine to manage the cluster because I wasn’t sure whether the problem was coming from the number of workers in the cluster, the number of items Beam was processing, or both. I wanted to be able to grab some resources and quickly start/stop ephemeral Dask clusters with varying numbers of workers within it to try things out at different scales, but without losing my resources of having to wait for nodes to provision.

I had a quick look at my account quotas and saw that I could provision up to 500 CPUs in the N2 family of instances. I wanted each worker to have a dedicated CPU core in case I did need to fully load them to reproduce the problem so I wanted a few hundred CPU cores.

I launched a 5 node cluster of c2-standard-60 instances. Each instance had 60 CPU cores, giving me 300 total which should be enough to reproduce my problem.

$ gcloud container clusters create jtomlinson-beam-test \
    --machine-type c2-standard-60 --spot \
    --zone us-central1-c --release-channel stable \
    --num-nodes 5

$ gcloud container clusters get-credentials jtomlinson-beam-test \
    --zone us-central1-c
Setting the --spot flag helped keep costs down at the expense of potentially losing my cluster part way through, luckily this didn’t happen. At the time of writing the spot price for c2-standard-60 was around $0.80 so my 5 node cluster would cost around $4/hour.

Then I installed the Dask Kubernetes Operator to manage launching Dask clusters.

$ helm install --repo https://helm.dask.org \
    --create-namespace -n dask-operator \
    --generate-name dask-kubernetes-operator

After waiting 5-10 mins for my cluster to launch I could begin debugging.

Debugging

The minimal example I had been given was quite long and still included the PyTorch code. It also imported functions from a few utility libraries in the project that it had been extracted from that were being used to connect to the Dask cluster and configure the Beam runner. I had never used Beam before so decided to dig into their documentation and figure out what was the most minimal script I could put together to run some Beam pipeline. So I started from scratch and I ended up with something like this.

import time

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.runners.dask.dask_runner import DaskRunner

from dask.distributed import Client
from dask_kubernetes.operator import KubeCluster

class NoopDoFn(beam.DoFn):
    def process(self, item):
        time.sleep(10)
        return [item]

def main() -> None:
    n_items = 128
    n_workers = 128

    # Create a Dask Cluster
    cluster = KubeCluster(name="beam-test", n_workers=n_workers)
    client = Client(cluster)
    client.wait_for_workers(n_workers)
    print(f"Dashboard at: {cluster.dashboard_link}")

    # Start a beam pipeline with a dask backend, and its options.
    print("Running Pipeline")
    pipeline = beam.Pipeline(
        runner=DaskRunner(),
        options=PipelineOptions(
            ["--dask_client_address", cluster.scheduler_address]
        ),
    )
    (
        pipeline
        | "Create collection" >> beam.Create(range(n_items))
        | "Noop 1" >> beam.ParDo(NoopDoFn())
        | "Noop 2" >> beam.ParDo(NoopDoFn())
        | "Noop 3" >> beam.ParDo(NoopDoFn())
    )
    result = pipeline.run()
    result.wait_until_finish()

    # Clean up
    client.close()
    cluster.close()


if __name__ == "__main__":
    main()

This script allowed me to quickly run a Dask cluster on Kubernetes with KubeCluster and then execute a Beam pipeline on it, then clean up the cluster again.

The two variables I wanted to explore were the number of items in the Beam pipeline and the number of workers in my Dask cluster. Making those actual variables at the top of my main() function allowed me to tweak those, run the script and watch the dashboard.

Using KubeCluster meant that I didn’t need to care about how to access Dask within my Kubenetes cluster. It automatically port-forwarded the Dask scheduler to my laptop so I could just print the dashboard link in my script and click on it in the terminal.

Having my Kubernetes cluster running in the cloud with 300 available cores allowed me to run my script over and over with 10, 50, 100, 150, 200, 250, 300, etc workers without having to wait more than a couple of seconds for the Dask cluster to setup.

Reducing noise

Next was enhancing up my debug script so that I could repeatedly reproduce my workload and remove as much noise as possible. At this point it’s fine for the script to grow and get more complex, we can cut it down again later.

The first few times I ran my script I was seeing a lot of warning output that was unrelated to what I was testing so I updated my script a little to suppress those.

I kept getting errors from the Dask scheduler saying that it couldn’t find apache-beam due to a recent change in Dask which requires the scheduler to have a consistent software environment with the client machine. I set an environment variable EXTRA_PIP_PACKAGES to ensure this was installed at runtime.

Installing packages at runtime slowed the cluster startup by a few seconds so I also decided to set KubeCluster(..., shutdown_on_close=False) and remove the cluster.close() so that the Dask cluster wouldn’t be deleted at the end of the script, and added a cluster.scale(n_workers) call to ensure the cluster was the right size if the script reused an existing one.

I was able to quickly clean up the Dask cluster with kubectl delete daskcluster beam-test between runs if I wanted a fresh start.

I noticed that my workers had unbounded resources and were assuming they each had the whole node to play with, so I also added some resource constraints to the worker Pods to limit them to one CPU each.

Finally I moved the Dask cluster setup/teardown logic into a context manager to separate the Beam code from the Dask code. I ended up with something like this.

import warnings
import time
from contextlib import contextmanager

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.runners.dask.dask_runner import DaskRunner

from dask.distributed import Client
from distributed.versions import VersionMismatchWarning
from dask_kubernetes.operator import KubeCluster

# Reduce output noise
for warn_type in [VersionMismatchWarning, FutureWarning]:
    warnings.filterwarnings("ignore", category=warn_type)


@contextmanager
def daskcluster():
    """Get a Dask cluster however you prefer."""
    n_workers = 200
    with KubeCluster(
        name="beam-test",
        n_workers=n_workers,
        env={"EXTRA_PIP_PACKAGES": "apache-beam"},
        resources={
            "requests": {"cpu": "500m", "memory": "1Gi"},
            "limits": {"cpu": "1000m", "memory": "1.85Gi"},
        },
        shutdown_on_close=False,  # Leave running for reuse next time
    ) as cluster:
        cluster.scale(
            n_workers
        )  # Ensure the right number of workers if reusing a cluster
        print(f"Dashboard at: {cluster.dashboard_link}")
        with Client(cluster) as client:
            print(f"Waiting for all {n_workers} workers")
            client.wait_for_workers(n_workers=n_workers)
            yield client


class NoopDoFn(beam.DoFn):
    def process(self, item):
        time.sleep(10)
        return [item]


def main() -> None:
    n_items = 200

    with daskcluster() as client:
        # Start a beam pipeline with a dask backend, and its options.
        print("Running Pipeline")
        pipeline = beam.Pipeline(
            runner=DaskRunner(),
            options=PipelineOptions(
                ["--dask_client_address", client.cluster.scheduler_address]
            ),
        )
        (
            pipeline
            | "Create collection" >> beam.Create(range(n_items))
            | "Noop 1" >> beam.ParDo(NoopDoFn())
            | "Noop 2" >> beam.ParDo(NoopDoFn())
            | "Noop 3" >> beam.ParDo(NoopDoFn())
        )

        result = pipeline.run()
        result.wait_until_finish()


if __name__ == "__main__":
    main()

Exploring the results

Now I had a script that would start running in a few seconds and I had two variables I could tweak to figure out what was going on.

I would run the script, click the dashboard link once it was printed and then watch the output to see what was going on.

Screenshot of the Dask Dashboard showing the Beam graph

At first I kept the n_items and n_workers variables the same as I changed them because the initial testing from the data science folks said that they were assuming a 1:1 mapping between workers and files.

I found with tens of workers and files things would behave as expected and each worker would be given one task at a time. The workers would move through the four stages in the pipeline and then the job would complete.

I also tried doubling the number of items to have a 2:1 ratio with the Dask workers and I saw that each stage in the pipeline would just get done in two halves by the workers.

I did notice that the names that Dask gave to each stage did not match with the names of any of the functions or pipeline stages that I had created. Dask showed one task called from_sequence, one called flatten and one called finalize. There were always three times as many flatten tasks as from_sequence tasks, and there were always four finalize tasks regardless of the number of items/workers.

Screenshot of the Dask Dashboard showing progress bars

My assumption was that beam.Create mapped directly to from_sequence but then any beam.ParDo call would be translated to a flatten call. I had three beam.ParDo steps in my pipeline so it made sense why there were three times as many flatten calls. I also guessed that one finalize task would happen for each stage in my pipeline.

As I increased the number of items/workers these tasks would grow until I got into the 200s, then they would drop down and always stay a 404 tasks.

Workers Items Tasks
1 1 8 (1 * 4 + 4)
2 2 12 (2 * 4 + 4)
10 10 44 (10 * 4 + 4)
50 50 204 (50 * 4 + 4)
100 100 404 (100 * 4 + 4)
150 150 604 (150 * 4 + 4)
190 190 764 (190 * 4 + 4)
199 199 800 (199 * 4 + 4)
200 200 404 (?)
250 250 404 (?)
300 200 404 (?)

Things behaved as expected until hitting a threshold of 200, then the number of Dask tasks in each stage would drop down to 100 every time.

I also tried changing the number of workers and keeping it the same and exactly the same thing happened.

Workers Items Tasks
100 50 204 (50 * 4 + 4)
100 100 404 (100 * 4 + 4)
100 150 604 (150 * 4 + 4)
100 190 764 (190 * 4 + 4)
100 199 800 (199 * 4 + 4)
100 200 404 (?)
100 250 404 (?)
100 200 404 (?)

Creating an MRE

At this point I was confident that the problem was with how Beam generated the Dask Graph for it’s pipelines. So it was time for me to open an issue on Beam’s repo with a Minimal Reproducible Example (MRE).

It looked like once there are more than 200 items in a Beam PCollection they get batched under the hood into batches of 100 items. No matter how many items were in the PCollection Beam would create 100 flatten calls for each beam.ParDo.

If the PCollection has 199 items then it will create 199 flatten tasks, but beyond that it will only create 100. This effectively means that if your Dask cluster has more than 199 workers (or more than 100 if your PCollection is larger than 199) then those workers will not be assigned any tasks.

Now that I know the problem is that the ParDo doesn’t scale when using the Dask Runner, rather than Dask not scaling I can create a simple script that will run on just my laptop.

I don’t need my Kubernetes cluster any more so I can easily clean that up.

$ gcloud container clusters delete jtomlinson-beam-test --zone us-central1-c

Here’s the final MRE script.

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.runners.dask.dask_runner import DaskRunner

from dask.distributed import Client, performance_report

class NoopDoFn(beam.DoFn):
    def process(self, item):
        import time
        time.sleep(0.1)
        return [item]


def run_pipeline(n_items):
    client = Client()
    pipeline = beam.Pipeline(
        runner=DaskRunner(),
        options=PipelineOptions(["--dask_client_address", client.cluster.scheduler_address]),
    )

    (
        pipeline
        | "Create collection" >> beam.Create(range(n_items))
        | "Noop 1" >> beam.ParDo(NoopDoFn())
        | "Noop 2" >> beam.ParDo(NoopDoFn())
        | "Noop 3" >> beam.ParDo(NoopDoFn())
    )

    with performance_report(filename=f"dask-report-{n_items}.html"):
        result = pipeline.run()
        result.wait_until_finish()

def main() -> None:
    # If this is 199 I get one task per item per stage (800 tasks, 199 concurrent)
    run_pipeline(199)
    # If this is 200 I get max 100 tasks per stage (404 tasks, 100 concurrent)
    run_pipeline(200)


if __name__ == "__main__":
    main()

This script uses dask.distributed.Client which by default creates a dask.distributed.LocalCluster that on my laptop creates a cluster with 12 workers.

Then it runs my no-op pipeline twice, once with 199 items and again with 200 items.

As I was looking at the dashboard to know how many tasks there were for each run I am using dask.distributed.performance_report() to capture the dashboard and write it to a file.

So this MRE script produces two reports dask-report-199.html and dask-report-200.html. I’ve uploaded those reports to a GitHub Gist to make it easy to share them in the issue along with the MRE script.

Summary

When large workloads fail at scale it can be daunting to dig into them a figure out what is going wrong. In this example the bug was happening on clusters with hundreds of GPUs, so the cost of repeatedly running the workload and trying to interactively debug it could feels like it could add up quickly.

But it’s likely that the scale of the problem can be emulated in some other way. In this example, once I ruled out the computation being a factor, I switched from hundreds of GPUs to hundreds of slower CPU cores which are much cheaper. But even then when I dug deeper into the problem I discovered I didn’t need all of those cores and could reproduce the problem on a laptop.

Instrumentation and telemetry is also key. The folks who discovered the bug did so by looking at resource usage from the batch system they were using. When I dug into the problem I heavily used the Dask dashboard and performance reports along with k9s to see what was going on in the Kubernetes cluster.

As with all debugging processes it’s a matter of isolating the problem and removing anything unnecessary. When debugging at scale you need to start removing the biggest, most expensive and most cumbersome things first. The deeper you get into debugging the less complex the problem starts to get. Moving from HPC to Cloud and finally to me laptop allowed me to iterate faster and faster with each development.


Have thoughts?

I love hearing feedback on my posts. You should head over to Twitter and let me know what you think!

Spotted a mistake? Why not suggest an edit!