{brickster}
provides full coverage of the Jobs REST
API’s (specifically version 2.1), allowing creation of multi-task jobs
via R code.
When listing the jobs the default behaviour is to returns the first
25 jobs, to configure the number of jobs returned you can specify
limit
and offset
.
The expand_tasks
parameter will return task and cluster
details for each job (default FALSE)
.
# list all jobs within Databricks workspace
# can control limit, offset, and if cluster/jobs details are returned
jobs <- db_jobs_list(limit = 10)
# list all runs within a specific job
job_runs <- db_jobs_runs_list(job_id = jobs[[1]]$job_id)
To return the details of a specific job you can use
db_jobs_get()
, this requires knowledge of the
job_id
which can be found via the user interface in
Databricks or using db_jobs_list()
.
Each job has one or more tasks which may have dependence upon each
other, execution of a job will flow through these tasks - this is known
as a “run” of a job. Jobs can be scheduled to start a run on a
particular schedule, or by a direct trigger such as
db_jobs_run_now()
.
There are a number of functions that are specific to job runs execution and metadata:
Purpose | Function |
---|---|
Trigger new run of an existing job | db_jobs_run_now() |
List runs of a job | db_jobs_runs_list() |
Retrieve metadata for a specific job run | db_jobs_runs_get() |
Retrieve metadata for a specific task run | db_jobs_runs_get_output() |
Export code and/or dashboard for task run | db_jobs_runs_export() |
Delete record of a job run | db_jobs_runs_delete() |
Cancel run of a job | db_jobs_runs_cancel() |
When using functions such as db_jobs_run_get_output()
and db_jobs_runs_export()
the required run_id
can refer to the id of a given task. Each task within a given “run” will
also have a run_id
that can be referenced.
{brickster}
enables creation of jobs from R, these jobs
can be simple single task jobs or complex multi-task jobs with
dependencies that share and re-use clusters.
A single-task job can be created with the following components:
job_task()
which defines a singular task
new_cluster()
required by job_task()
,
it defines the compute specifications
*_task()
, one of the task functions
e.g. notebook_task()
Below is the creation of a simple job that runs a notebook and has a paused schedule:
# define a job task
simple_task <- job_task(
task_key = "simple_task",
description = "a simple task that runs a notebook",
# specify a cluster for the job
new_cluster = new_cluster(
spark_version = "9.1.x-scala2.12",
driver_node_type_id = "m5a.large",
node_type_id = "m5a.large",
num_workers = 2,
cloud_attr = aws_attributes(ebs_volume_size = 32)
),
# this task will be a notebook
task = notebook_task(notebook_path = "/brickster/simple-notebook")
)
Jobs can be extended beyond a singular task, this next example
extends the simple job example by now having three tasks. Each
subsequent task depends on the priors completion before it can proceed,
each task also will define its own new_cluster
.
Whilst this job runs the same notebook each time for the sake of example it demonstrates how to build dependencies.
# one cluster definition, repeatedly use for each task
multitask_cluster <- new_cluster(
spark_version = "9.1.x-scala2.12",
driver_node_type_id = "m5a.large",
node_type_id = "m5a.large",
num_workers = 2,
cloud_attr = aws_attributes(ebs_volume_size = 32)
)
# each task will run the same notebook (just for this example)
multitask_task <- notebook_task(notebook_path = "/brickster/simple-notebook")
# create three simple tasks that will depend on each other
# task_a -> task_b -> task_c
task_a <- job_task(
task_key = "task_a",
description = "First task in the sequence",
new_cluster = multitask_cluster,
task = multitask_task
)
task_b <- job_task(
task_key = "task_b",
description = "Second task in the sequence",
new_cluster = multitask_cluster,
task = multitask_task,
depends_on = "task_a"
)
task_c <- job_task(
task_key = "task_c",
description = "Third task in the sequence",
new_cluster = multitask_cluster,
task = multitask_task,
depends_on = "task_b"
)
The multiple tasks example has one shortcoming - clusters are not reused between tasks and therefore each task will have to wait for another cluster to be started before its computations can begin, this will typically add a few minutes to each task.
It can be advantageous to reuse clusters between job tasks to avoid overhead for short lived tasks, or to share resources for tasks which are not computationally demanding.
This example is the same as the previous multi-task job however it now shares a single cluster for each task.
new_cluster
is replaced with
job_cluster_key
in each task:
# create three simple tasks that will depend on each other
# this time we will use a shared cluster to reduce startup overhead
# task_a -> task_b -> task_c
task_a <- job_task(
task_key = "task_a",
description = "First task in the sequence",
job_cluster_key = "shared_cluster",
task = multitask_task
)
task_b <- job_task(
task_key = "task_b",
description = "Second task in the sequence",
job_cluster_key = "shared_cluster",
task = multitask_task,
depends_on = "task_a"
)
task_c <- job_task(
task_key = "task_c",
description = "Third task in the sequence",
job_cluster_key = "shared_cluster",
task = multitask_task,
depends_on = "task_b"
)
job_clusters
is now defined in the
db_jobs_create()
call where it accepts a named list of
new_clusters()
, we will reuse our example from earlier:
# define job_clusters as a named list of new_cluster()
multitask_job_with_reuse <- db_jobs_create(
name = "brickster example: multi-task with reuse",
job_clusters = list("shared_cluster" = multitask_cluster),
tasks = job_tasks(task_a, task_b, task_c),
# 9am every day, paused currently
schedule = cron_schedule(
quartz_cron_expression = "0 0 9 * * ?",
pause_status = "PAUSED"
)
)
There can be multiple shared clusters and it may be beneficial to give some demanding tasks a larger cluster alone (or to share with other demanding tasks).
It’s possible to create one-off jobs that do not have a schedule and
only appears under the “jobs runs” tab, not “jobs”.
db_jobs_runs_submit()
permits the use of an
idempotency_token
which can avoid re-submission of a
run.
Instead of using notebooks contained within the Databricks workspace a job can be referenced in an external git repository and reference a specific branch, tag, or commit.
Let’s revisit the simple job example from before and adjust to use
git_source()
.
In this example the repo will have a folder example
which
contains simple-notebook.py
.
# define a job task
# this time, the notebook_path is relative to the git root directory
# omit file extensions like .py or .r
simple_task <- job_task(
task_key = "simple_task",
description = "a simple task that runs a notebook",
# specify a cluster for the job
new_cluster = new_cluster(
spark_version = "9.1.x-scala2.12",
driver_node_type_id = "m5a.large",
node_type_id = "m5a.large",
num_workers = 2,
cloud_attr = aws_attributes(ebs_volume_size = 32)
),
# this task will be a notebook
task = notebook_task(notebook_path = "example/simple-notebook")
)
Within the call to db_jobs_create()
use
git_source()
to point to the repo containing the
notebook.
# create job with simple task
simple_task_job <- db_jobs_create(
name = "brickster example: simple",
tasks = job_tasks(simple_task),
# git source points to repo
git_source = git_source(
git_url = "www.github.com/<user>/<repo>",
git_provider = "github",
reference = "main",
type = "branch"
),
# 9am every day, paused currently
schedule = cron_schedule(
quartz_cron_expression = "0 0 9 * * ?",
pause_status = "PAUSED"
)
)
When specifying git_source()
all
notebook_task()
must be repo paths - it’s not possible to
have a mix of repo and workspace based notebooks. Additionally, this
behaviour only works for notebook_task()
, other tasks such
as spark_python_task()
are unsupported.
Jobs can be partially updated, for example to rename an existing job:
# only change the job name
db_jobs_update(
job_id = multitask_job_with_reuse$job_id,
name = "brickster example: renamed job"
)
Adding a timeout and adjusting maximum concurrent runs allowed:
# adding timeout and increasing max concurrent runs
db_jobs_update(
job_id = multitask_job_with_reuse$job_id,
timeout_seconds = 60 * 5,
max_concurrent_runs = 2
)
You can add new tasks but it may involve some extra steps, either:
Having job_task()
required for job defined already
within R and adding new tasks
Specifying all job_task()
’s again
Using db_jobs_get()
and parsing the returned
metadata into job_task()
’s
A complete overwrite of a job can be done with
db_jobs_reset()
- this allows the job_id
to
remain constant while maintaining the historical run information with
prior settings. db_jobs_reset()
is the same as
db_jobs_create()
except it takes one additional parameter
(job_id
).
db_jobs_list()
and db_jobs_runs_list()
were
covered in job basics.
Jobs can be triggered by:
Their defined schedule (see:
cron_schedule()
)
Using db_jobs_run_now()
to trigger a run outside
regular schedule or on a paused job.
Creating a one off job run via
db_jobs_runs_submit()
The only example yet to be covered is db_jobs_run_now()
,
it accepts parameters as named lists:
Runs can also be cancelled:
Jobs and runs can be deleted with db_jobs_delete()
and
db_jobs_runs_delete()
respectively.
Cleaning up all jobs created in this documentation: