#### Track notebooks, scripts & workflows [image: .md][image]

This guide walks from tracking data lineage in a notebook to tracking
parameters in workflows.

**Note:** To run examples, if you don't have a "lamindb" instance,
create one:

 !lamin init --storage ./test-track

### Manage notebooks and scripts

Call "track()" to save your notebook or script as a "transform" and
start tracking inputs & outputs of a run.

 import lamindb as ln

 ln.track()  # initiate a tracked notebook/script run

 # your code automatically tracks inputs & outputs

 ln.finish()  # mark run as finished, save execution report, source code & environment

You find your notebooks and scripts in the "Transform" registry along
with pipelines & functions:

 transform = ln.Transform.get(key="my_analyses/my_notebook.ipynb")
 transform.source_code # source code
 transform.runs.to_dataframe() # all runs in a dataframe
 transform.latest_run.report # report of latest run
 transform.latest_run.environment  # environment of latest run

You can use the CLI to load a transform into your current
(development) directory:

 lamin load --key my_analyses/my_notebook.ipynb

Here is how you'd load the notebook from the video into your local
directory:

 lamin load https://lamin.ai/laminlabs/lamindata/transform/F4L3oC6QsZvQ

##### Organize local development

If no development directory is set, script & notebook keys equal their
filenames. Otherwise, they represent the relative path in the
development directory. The exception is packaged source code, whose
keys have the form "pypackages/{package_name}/path/to/file.py".

To set the development directory to your current shell development
directory, run:

 lamin settings set dev-dir .

You can see the current status by running:

 lamin info

When you "cd" into that directory, you will now auto-connect to the
configured lamindb instance.

To sync scripts or workflows with their correponding files in a git
repo, either export an environment variable:

 export LAMINDB_SYNC_GIT_REPO = <YOUR-GIT-REPO-URL>

Or set the following setting:

 ln.settings.sync_git_repo = <YOUR-GIT-REPO-URL>

If you work on a single project in your lamindb instance, it makes
sense to set LaminDB's "dev-dir" to the root of the local git repo
clone.

 dbs/
 project1/
 .git/
 .lamin/
 script1.py
 notebook1.ipynb
 ...

If you work on multiple projects in your lamindb instance, you can use
the "dev-dir" as the local root and nest git repositories in it.

 dbs/
 database1/
 .lamin/
 repo1/
 .git/
 repo2/
 .git/
 ...

##### Use projects

You can link the entities created during a run to a project.

 import lamindb as ln

 my_project = ln.Project(name="My project").save()  # create & save a project
 ln.track(project="My project")  # pass project
 open("sample.fasta", "w").write(">seq1\nACGT\n")  # create a dataset
 ln.Artifact("sample.fasta", key="sample.fasta").save()  # auto-labeled by project

Filter entities by project, e.g., artifacts:

 ln.Artifact.filter(projects=my_project).to_dataframe()

Access entities linked to a project:

 my_project.artifacts.to_dataframe()

The same works for "my_project.transforms" or "my_project.runs".

##### Use spaces

You can write the entities created during a run into a space that you
configure on LaminHub. This is particularly useful if you want to
restrict access to a space. Note that this doesn't affect bionty
entities who should typically be commonly accessible.

 ln.track(space="Our team space")

##### Track agent plans

Saving an agent plan automatically tags with "artifact.kind = "plan""
and infers a "key" starting with ".plans/":

 lamin save /path/to/.cursor/plans/my_task.plan.md
 lamin save /path/to/.claude/plans/my_task.md

Link an agent plan against a run:

 ln.track(plan=".plans/my-agent-plan.md")

This links the "plan" artifact to a run in the same way as
"transform", an initiating run ("initiated_by_run"), and "report" /
"environment" artifacts are linked to the run.

While "transform" acts as the deterministic source code for the run
and "initiated_by_run" enables higher-level runs in workflow
orchestration, the agent "plan" complements these by linking a plan
that steers a non-deterministic agent.

### Manage workflows

Here we'll manage workflows with "lamindb"'s "flow()" and "step()"
decorators, which works out-of-the-box with the majority of Python
workflow managers:

| --- | --- | --- | --- |
| tool | workflow decorator | step/task decorator | notes |
| =========================== | =========================== | =========================== | =========================== |
| "lamindb" | "@flow" | "@step" | inspired by "prefect" |
| --- | --- | --- | --- |
| "prefect" | "@flow" | "@task" | two decorators |
| --- | --- | --- | --- |
| "redun" | "@task" (on main) | "@task" | single decorator for |
| everything |
| --- | --- | --- | --- |
| "dagster" | "@job" or "@asset" | "@op" or "@asset" | asset-centric; "@asset" |
| is primary |
| --- | --- | --- | --- |
| "flyte" | "@workflow" | "@task" | also "@dynamic" for |
| runtime DAGs |
| --- | --- | --- | --- |
| "airflow" | "@dag" | "@task" | TaskFlow API (modern); |
| also supports operators |
| --- | --- | --- | --- |
| "zenml" | "@pipeline" | "@step" | inspired by "prefect" |
| --- | --- | --- | --- |

If you're looking for more in-depth examples or for integrating with
non-decorator-based workflow managers such as Nextflow or Snakemake,
see Manage computational pipelines.

| --- | --- | --- | --- |
| tool | workflow | step/task | notes |
| =========================== | =========================== | =========================== | =========================== |
| "nextflow" | "workflow" keyword | "process" keyword | groovy-based DSL |
| --- | --- | --- | --- |
| "snakemake" | "rule" keyword | "rule" keyword | file-based DSL |
| --- | --- | --- | --- |
| "metaflow" | "FlowSpec" | "@step" | class-based |
| --- | --- | --- | --- |
| "kedro" | "Pipeline()" | "node()" | function-based |
| --- | --- | --- | --- |

##### A one-step workflow

Decorate a function with "flow()" to track it as a workflow:

my_workflow.py

 import lamindb as ln

 @ln.flow()
 def ingest_dataset(key: str) -> ln.Artifact:
 df = ln.examples.datasets.mini_immuno.get_dataset1()
 artifact = ln.Artifact.from_dataframe(df, key=key).save()
 return artifact

 if __name__ == "__main__":
 ingest_dataset(key="my_analysis/dataset.parquet")

Let's run the workflow:

 !python scripts/my_workflow.py

Query the workflow via its filename:

 transform = ln.Transform.get(key="my_workflow.py")
 transform.describe()

The run stored the parameter value for "key":

 transform.latest_run.describe()

It links output artifacts:

 transform.latest_run.output_artifacts.to_dataframe()

You can query for all runs that ran with that parameter:

 ln.Run.filter(
 params__key="my_analysis/dataset.parquet",
 ).to_dataframe()

You can also pass complex parameters and features, see: Track
parameters & features.

##### A multi-step workflow

Here, the workflow calls an additional processing step:

my_workflow_with_step.py

 import lamindb as ln

 @ln.step()
 def subset_dataframe(
 artifact: ln.Artifact,
 subset_rows: int = 2,
 subset_cols: int = 2,
 ) -> ln.Artifact:
 df = artifact.load()
 new_data = df.iloc[:subset_rows, :subset_cols]
 new_key = artifact.key.replace(".parquet", "_subsetted.parquet")
 return ln.Artifact.from_dataframe(new_data, key=new_key).save()

 @ln.flow()
 def ingest_dataset(key: str, subset: bool = False) -> ln.Artifact:
 df = ln.examples.datasets.mini_immuno.get_dataset1()
 artifact = ln.Artifact.from_dataframe(df, key=key).save()
 if subset:
 artifact = subset_dataframe(artifact)
 return artifact

 if __name__ == "__main__":
 ingest_dataset(key="my_analysis/dataset.parquet", subset=True)

Let's run the workflow:

 !python scripts/my_workflow_with_step.py

The lineage of the subsetted artifact resolves the subsetting step:

 subsetted_artifact = ln.Artifact.get(key="my_analysis/dataset_subsetted.parquet")
 subsetted_artifact.view_lineage()

This is the run that created the subsetted_artifact:

 subsetted_artifact.run

This is the initating run that triggered the function call:

 subsetted_artifact.run.initiated_by_run

These are the parameters of the run:

 subsetted_artifact.run.params

These are the input artifacts:

 subsetted_artifact.run.input_artifacts.to_dataframe()

These are output artifacts:

 subsetted_artifact.run.output_artifacts.to_dataframe()

##### A workflow with CLI arguments

Let's use "click" to parse CLI arguments:

my_workflow_with_click.py

 import click
 import lamindb as ln

 @click.command()
 @click.option("--key", required=True)
 @ln.flow()
 def main(key: str):
 df = ln.examples.datasets.mini_immuno.get_dataset2()
 ln.Artifact.from_dataframe(df, key=key).save()

 if __name__ == "__main__":
 main()

Let's run the workflow:

 !python scripts/my_workflow_with_click.py --key my_analysis/dataset2.parquet

CLI arguments are tracked and accessible via "run.cli_args":

 run = ln.Run.filter(transform__key="my_workflow_with_click.py").first()
 run.describe()

Note that it doesn't matter whether you use "click", "argparse", or
any other CLI argument parser.

### Track parameters & features

We just saw that the function decorators "@ln.flow()" and "@ln.step()"
track parameter values automatically. Here is how to pass parameters
to "ln.track()":

run_track_with_params.py

 import argparse
 import lamindb as ln

 if __name__ == "__main__":
 p = argparse.ArgumentParser()
 p.add_argument("--input-dir", type=str)
 p.add_argument("--downsample", action="store_true")
 p.add_argument("--learning-rate", type=float)
 args = p.parse_args()
 params = {
 "input_dir": args.input_dir,
 "learning_rate": args.learning_rate,
 "preprocess_params": {
 "downsample": args.downsample,
 "normalization": "the_good_one",
 },
 }
 ln.track(params=params)

 # your code

 ln.finish()

Run the script.

 !python scripts/run_track_with_params.py  --input-dir ./mydataset --learning-rate 0.01 --downsample

Query for all runs that match certain parameters:

 ln.Run.filter(
 params__learning_rate=0.01,
 params__preprocess_params__downsample=True,
 ).to_dataframe()

Describe & get parameters:

 run = ln.Run.filter(params__learning_rate=0.01).order_by("-started_at").first()
 run.describe()
 run.params

You can also access the CLI arguments used to start the run directly:

 run.cli_args

You can also track run features in analogy to artifact features.

In contrast to params, features are validated against the "Feature"
registry and allow to express relationships with entities in your
registries.

Let's first define labels & features.

 experiment_type = ln.Record(name="Experiments", is_type=True).save()
 experiment_label = ln.Record(name="Experiment1", type=experiment_type).save()
 ln.Feature(name="s3_folder", dtype=str).save()
 ln.Feature(name="experiment", dtype=experiment_type).save()

 !python scripts/run_track_with_features_and_params.py  --s3-folder s3://my-bucket/my-folder --experiment Experiment1

 ln.Run.filter(s3_folder="s3://my-bucket/my-folder").to_dataframe()

Describe & get feature values.

 run2 = ln.Run.filter(
 s3_folder="s3://my-bucket/my-folder", experiment="Experiment1"
 ).last()
 run2.describe()
 run2.features.get_values()

### Manage functions in scripts and notebooks

If you want more-fined-grained data lineage tracking in a script or
notebook where you called "ln.track()", you can also use the "step()"
decorator.

##### In a notebook

 @ln.step()
 def subset_dataframe(
 input_artifact_key: str,
 output_artifact_key: str,
 subset_rows: int = 2,
 subset_cols: int = 2,
 ) -> None:
 artifact = ln.Artifact.get(key=input_artifact_key)
 dataset = artifact.load()
 new_data = dataset.iloc[:subset_rows, :subset_cols]
 ln.Artifact.from_dataframe(new_data, key=output_artifact_key).save()

Prepare a test dataset:

 df = ln.examples.datasets.mini_immuno.get_dataset1(otype="DataFrame")
 input_artifact_key = "my_analysis/dataset.parquet"
 artifact = ln.Artifact.from_dataframe(df, key=input_artifact_key).save()

Run the function with default params:

 ouput_artifact_key = input_artifact_key.replace(".parquet", "_subsetted.parquet")
 subset_dataframe(input_artifact_key, ouput_artifact_key, subset_rows=1)

Query for the output:

 subsetted_artifact = ln.Artifact.get(key=ouput_artifact_key)
 subsetted_artifact.view_lineage()

Re-run the function with a different parameter:

 subsetted_artifact = subset_dataframe(
 input_artifact_key, ouput_artifact_key, subset_cols=3
 )
 subsetted_artifact = ln.Artifact.get(key=ouput_artifact_key)
 subsetted_artifact.view_lineage()

We created a new run:

 subsetted_artifact.run

With new parameters:

 subsetted_artifact.run.params

And a new version of the output artifact:

 subsetted_artifact.run.output_artifacts.to_dataframe()

##### In a script

run_script_with_step.py

 import argparse
 import lamindb as ln

 @ln.step()
 def subset_dataframe(
 artifact: ln.Artifact,
 subset_rows: int = 2,
 subset_cols: int = 2,
| run: ln.Run | None = None, |
 ) -> ln.Artifact:
 dataset = artifact.load(is_run_input=run)
 new_data = dataset.iloc[:subset_rows, :subset_cols]
 new_key = artifact.key.replace(".parquet", "_subsetted.parquet")
 return ln.Artifact.from_dataframe(new_data, key=new_key, run=run).save()

 if __name__ == "__main__":
 p = argparse.ArgumentParser()
 p.add_argument("--subset", action="store_true")
 args = p.parse_args()

 params = {"is_subset": args.subset}

 ln.track(params=params)

 if args.subset:
 df = ln.examples.datasets.mini_immuno.get_dataset1(otype="DataFrame")
 artifact = ln.Artifact.from_dataframe(
 df, key="my_analysis/dataset.parquet"
 ).save()
 subsetted_artifact = subset_dataframe(artifact)

 ln.finish()

 !python scripts/run_script_with_step.py --subset

 ln.view()

### The database

See the state of the database after we ran these different examples:

 ln.view()

### Using transform versions as templates

A transform acts like a template upon using "lamin load" to load it.
Consider you run:

 lamin load https://lamin.ai/account/instance/transform/Akd7gx7Y9oVO0000

Upon running the returned notebook or script, you'll automatically
create a new version and be able to browse it via the version dropdown
on the UI.

Additionally, you can:

* label using "ULabel" or "Record", e.g.,
  "transform.records.add(template_label)"

* tag with an indicative "version" string, e.g., "transform.version =
  "T1"; transform.save()"

-[ Saving a notebook as an artifact ]-

Sometimes you might want to save a notebook as an artifact. This is
how you can do it:

 lamin save template1.ipynb --key templates/template1.ipynb --description "Template for analysis type 1" --registry artifact

A few checks at the end of this notebook:

 assert run.params == {
 "input_dir": "./mydataset",
 "learning_rate": 0.01,
 "preprocess_params": {"downsample": True, "normalization": "the_good_one"},
 }, run.params
 assert my_project.artifacts.exists()
 assert my_project.transforms.exists()
 assert my_project.runs.exists()