Monitoring and Managing Workflows in Databricks.

Manuel Gil
6 min readJun 5, 2023

--

In the world of data science and machine learning, Databricks has become one of the most well-known platforms on which data scientists and Machine Learning Engineers execute their tasks. To ensure seamless execution and prompt error detection, it is crucial to have a robust monitoring system in place, which continuously retrieves the workflow’s state, enabling real-time monitoring. In this post, we will explore the mechanisms Databricks provides to monitor jobs in Databricks.

Workflows vs Jobs

In Databricks, a workflow refers to a logical sequence of multiple data-processing tasks. It provides a flexible framework that enables data scientists, data analysts, and Machine Learning Engineers to develop ETL pipelines, and analytical and ML workflows. Overall, it allows for the creation of intricate workflows without needing to manage complex infrastructure.

On the other hand, a job refers to the execution of a specific task or set of tasks within a workflow, in other words, it refers to a single run of the code, script, or any other supported Databricks-task.

In the image shown above, we can observe an example workflow consisting of three tasks. Each task is associated with code or a notebook that needs to be executed, along with any required parameters and configurations. The execution of a job can be scheduled to run at specific intervals or triggered manually as needed. Databricks offers a web-based interface and APIs for creating, managing, and monitoring jobs.

Databricks API request.

The purpose of this article is not to focus on the creation of workflows, but rather on monitoring their different states. For detailed information on creating workflows, we encourage you to read the Databricks documentation. Once a job is created, we can use the job_id to make API requests.

To handle API requests, we can utilize the requests library in Python. By constructing the endpoint and providing authentication through headers, we can interact with the API. The base URL for all endpoints follows the following format.”

base_url = f"https://{browser_hostname}/api/2.1/jobs/”

Lastly, we can include the authorization using the headers of the API requests. The format for the headers is as follows:

headers = {"Authorization": f"Bearer {token}"}

As you may have noticed, we need to provide at least two variables when making API requests. The browser_hostname is a part of the URL for our Databricks workspace.

For authentication, we have two options. We can create a token by going to User settings and generating a new token. Alternatively, if we are running the monitoring code from a Databricks notebook, we can retrieve these values from dbutils and the notebook context.

# Get parameters from notebook context
browser_hostname = dbutils.notebook.entry_point.getDbutils().notebook().getContext().browserHostName().getOrElse(None)
token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().getOrElse(None)

Triggering a job.

Now that we know how to construct the base URL, we can begin using the API to manage the newly created job. One of the initial tasks we may want to accomplish is running the job. To trigger a new job run, we can use the following endpoint: /api/2.1/jobs/run-now.

def trigger_job(job_id) -> dict:
"""Triggers a job and returns the run_id."""
endpoint = f"{base_url}run-now/"
response = requests.post(
url=endpoint,
data=json.dumps({"job_id": job_id}),
headers=headers,
)

if response.status_code == 200:
return response.json()
else:
return response.text

The function will trigger the job, initiating a new job run. In the following image, we can observe the state of the job in the workflow UI. The Run ID field specifies the identifier for this execution.

To retrieve the state of the run, we can use a different endpoint by utilizing the job_run_id. The endpoint follows the structure: /api/2.1/jobs/runs/get. When we encapsulate this functionality into a function, the resulting code looks like this:

def get_job_run(job_run_id:str) -> dict:
"""Returns the job run."""
endpoint = f"{base_url}runs/get/"
response = requests.get(
url=endpoint,
data=json.dumps({"run_id": job_run_id}),
headers=headers
)
if response.status_code == 200:
return response.json()
else:
return response.text

To retrieve the state of the run, we need to provide its run ID, which is obtained from the response of the previous request. In this case, the response provides additional information, as illustrated below.

{
"job_id": 476435060329526,
"run_id": 27096,
"creator_user_name": "manuel.gil-m@outlook.com",
"number_in_job": 27096,
"original_attempt_run_id": 27096,
"state": {
"life_cycle_state": "TERMINATED",
"result_state": "SUCCESS",
"state_message": "",
"user_cancelled_or_timedout": false
},
"start_time": 1685849752959,
"setup_duration": 1000,
"execution_duration": 8000,
"cleanup_duration": 0,
"end_time": 1685849762805,
"trigger": "ONE_TIME",
"run_name": "test_job",
"run_page_url": "-----#job/476435060329526/run/27096",
"run_type": "JOB_RUN",
"tasks": [
{
"run_id": 27940,
"task_key": "task1",
"notebook_task": {
"notebook_path": "/Shared/job_notebooks/exploratory_data_analysis_test",
"source": "WORKSPACE"
},
"existing_cluster_id": "---- ------ -----",
"state": {
"life_cycle_state": "TERMINATED",
"result_state": "SUCCESS",
"state_message": "",
"user_cancelled_or_timedout": false
},
"run_page_url": "-----#job/476435060329526/run/27940",
"start_time": 1685849752970,
"setup_duration": 1000,
"execution_duration": 8000,
"cleanup_duration": 0,
"end_time": 1685849762729,
"cluster_instance": {
"cluster_id": "--------------",
"spark_context_id": "2092032582937742126"
},
"attempt_number": 0
}
],
"format": "MULTI_TASK"
}

While there are several fields we can examine, let’s focus on the fields that offer more information about the job’s state. One such field is called state, which provides the result and lifecycle state of the run. We can find this field in the main response, as well as within each of the tasks performed by the run. In this case, each task has its own run_id which we can utilize to retrieve the run results. The structure of this field is shown below.

structure of state object

When implementing a real-time monitoring solution, it is important to consider the life_cycle_state field, which provides information about the current status of the job. Additionally, the result_state field provides information associated with the final state of the job. Let’s monitor a job run using the function mentioned earlier. By implementing a while loop, we can make API requests at intervals and retrieve the state of the job (and the tasks) at those specific times.

job_run_id = response["run_id"]
final_states = ["TERMINATED", "SKIPPED", "INTERNAL_ERROR", "BLOCKED"]
final_state = None
while final_state not in final_states:
job_run = job.get_job_run(job_run_id)
final_state = job_run["state"]["life_cycle_state"]
print("JOB STATE:")
print(job_run["state"])
print("TASK STATE:")
print(job_run["tasks"][0]["state"])
print("\n")
time.sleep(40)

The final result looks like this.

JOB STATE:
{'life_cycle_state': 'RUNNING', 'state_message': '', 'user_cancelled_or_timedout': False}
TASK STATE:
{'life_cycle_state': 'PENDING', 'state_message': '', 'user_cancelled_or_timedout': False}


JOB STATE:
{'life_cycle_state': 'RUNNING', 'state_message': '', 'user_cancelled_or_timedout': False}
TASK STATE:
{'life_cycle_state': 'RUNNING', 'state_message': 'In run', 'user_cancelled_or_timedout': False}


JOB STATE:
{'life_cycle_state': 'RUNNING', 'state_message': '', 'user_cancelled_or_timedout': False}
TASK STATE:
{'life_cycle_state': 'RUNNING', 'state_message': 'In run', 'user_cancelled_or_timedout': False}


JOB STATE:
{'life_cycle_state': 'RUNNING', 'state_message': '', 'user_cancelled_or_timedout': False}
TASK STATE:
{'life_cycle_state': 'RUNNING', 'state_message': 'In run', 'user_cancelled_or_timedout': False}


JOB STATE:
{'life_cycle_state': 'TERMINATED', 'result_state': 'SUCCESS', 'state_message': '', 'user_cancelled_or_timedout': False}
TASK STATE:
{'life_cycle_state': 'TERMINATED', 'result_state': 'SUCCESS', 'state_message': '', 'user_cancelled_or_timedout': False}

We can observe that the life_cycle_state changes with each request. It is important to note that the result_state field only appears when the job reaches its final life cycle state.

The capabilities of the API extend far beyond what has been shown in this article. We can retrieve information from the tasks, create and delete jobs, and much more. In the upcoming articles, we will continue exploring Databricks APIs to monitor and manage workflows.

JobManager class

I am passionate about data science and like to explain how these concepts can be used to solve problems in a simple way. If you have any questions or just want to connect, you can find me on LinkedIn or email me at manuelgilsitio@gmail.com.

References.

--

--

Manuel Gil
Manuel Gil

Written by Manuel Gil

I am an engineer and I am passionate about the computer science field. I have two years working as a Data scientist, using data to solve industrial problems.

No responses yet