Nextflow

Nextflow is a workflow management system used for executing scientific workflows across platforms scalably, portably, and reproducibly.

Here, we’ll run a demo of the microscopy pipeline mcmicro to correct uneven illumination. Reference

Note

Typically, you run the Nextflow workflow from the command line or Seqera Platform and then register input and output data with a script. The Seqera Platform allows for post-run scripts that can automate this process.

!lamin init --storage . --name nextflow-mcmicro
Hide code cell output
💡 connected lamindb: testuser1/nextflow-mcmicro
import lamindb as ln
💡 connected lamindb: testuser1/nextflow-mcmicro

Run mcmicro/exemplar pipeline

Register the exemplar-001 data as input of the mcmicro pipeline by running the script below:

Hide code cell source
from IPython.display import display, Code

with open('mcmicro_exemplar.py', 'r') as file:
    display(Code(file.read(), language='python'))
Hide code cell output
# mcmicro_exemplar.py

"""This script downloads the MCMICRO exemplar data and registers it in LaminDB.

Usage: python mcmicro_exemplar.py exemplar-001
"""

import lamindb as ln
import subprocess
import argparse


# get args from command line
parser = argparse.ArgumentParser()
parser.add_argument("name", type=str, help="Name of the example data.")
args = parser.parse_args()
if args.name not in ["exemplar-001", "exemplar-002"]:
    raise ValueError("Invalid name. Use 'exemplar-001' or 'exemplar-002'.")

# execute the nextflow pipeline to download example data
report = f"{args.name}_mcmicro-exemplar_execution_report.html"
subprocess.run(
    [
        "nextflow",
        "run",
        "labsyspharm/mcmicro/exemplar.nf",
        "--name",
        args.name,
        "-with-report",
        report,
    ]
)

# get the nextflow execution id from the log (last row, latest run is at the bottom)
nextflow_id = subprocess.getoutput(
    "nextflow log | tail -n 1 | awk -F '\t' '{print $6}'"
)

# track the pipeline transform
transform = ln.Transform(
    name="MCMICRO exemplar",
    version="1.0.0",
    type="pipeline",
    reference="https://github.com/labsyspharm/mcmicro",
)
run = ln.track(transform=transform)
# optionally, tag the transform
ulabel = ln.ULabel(name="nextflow").save()
run.transform.ulabels.add(ulabel)
# track the execution report, set visibility to "hidden" to avoid cluttering the artifacts
report_artifact = ln.Artifact(
    report, description=f"nextflow report of {nextflow_id}", visibility=0, run=False
).save()
run.report = report_artifact
run.reference = nextflow_id
run.reference_type = "nextflow_id"
run.save()
# optionally, track the pipeline parameters
ln.Param(name="name", dtype="str").save()
run.params.add_values({"name": args.name})

# register the downloaded folder
exemplar_dir = ln.Artifact(args.name, description=args.name)
exemplar_dir.save()

ln.finish()
!python mcmicro_exemplar.py exemplar-001
Hide code cell output
💡 connected lamindb: testuser1/nextflow-mcmicro
N E X T F L O W  ~  version 24.04.3
Pulling labsyspharm/mcmicro ...
 downloaded from https://github.com/labsyspharm/mcmicro.git
Launching `https://github.com/labsyspharm/mcmicro` [compassionate_payne] DSL2 - revision: 5883dda717 [master]
[be/d4af41] Submitted process > getMarkers
[3e/60160f] Submitted process > getImages (1)
[6c/c8fa84] Submitted process > getIllumination (3)
[7a/edaecb] Submitted process > getImages (3)
[70/5079a9] Submitted process > getIllumination (2)
[b1/ecc620] Submitted process > getImages (2)
[2d/ae3f1a] Submitted process > getIllumination (1)
💡 saved: Transform(uid='HXJzgRZiHrZ6cSof', version='1.0.0', name='MCMICRO exemplar', type='pipeline', reference='https://github.com/labsyspharm/mcmicro', created_by_id=1, updated_at='2024-07-26 14:37:05 UTC')
💡 saved: Run(uid='E7G9YabclsJILSkVERHz', transform_id=1, created_by_id=1)

Run mcmicro pipeline

Run mcmicro pipeline and track input/output data with the script below:

Hide code cell source
with open('mcmicro.py', 'r') as file:
    display(Code(file.read(), language='python'))
# mcmicro.py

"""This script runs the MCMICRO pipeline and tracks input/output data in LaminDB.

Usage: python mcmicro.py exemplar-001
"""

import subprocess
import argparse
import lamindb as ln
import yaml

# get args from command line
parser = argparse.ArgumentParser()
parser.add_argument("input", type=str, help="Input folder name.")
args = parser.parse_args()

transform = ln.Transform(
    name="MCMICRO",
    version="1.0.0",
    type="pipeline",
    reference="https://github.com/labsyspharm/mcmicro",
)
run = ln.track(transform=transform)

# get the input data from LaminDB
mcmicro_input = ln.Artifact.filter(description=args.input).one()
input_dir = mcmicro_input.cache()

# execute the nextflow pipeline to download example data
report = f"{args.input}-mcmicro-execution_report.html"
subprocess.run(
    [
        "nextflow",
        "run",
        "https://github.com/labsyspharm/mcmicro",
        "--in",
        input_dir,
        "--start-at",
        "illumination",
        "--stop-at",
        "registration",
        "-with-report",
        report,
    ]
)

# get the nextflow execution id from the log (first row)
nextflow_id = subprocess.getoutput(
    "nextflow log | tail -n 1 | awk -F '\t' '{print $6}'"
)
# optionally, tag the transform
ulabel = ln.ULabel(name="nextflow").save()
run.transform.ulabels.add(ulabel)
# track the execution report, set visibility to "hidden" to avoid cluttering the artifacts
report_artifact = ln.Artifact(
    report, description=f"nextflow report of {nextflow_id}", visibility=0, run=False
).save()
run.report = report_artifact
run.reference = nextflow_id
run.reference_type = "nextflow_id"
run.save()
# optionally, track the pipeline parameters
with open(f"{input_dir}/qc/params.yml") as params_file:
    qc_params = yaml.safe_load(params_file)
ln.Param(name="qc_params", dtype="dict").save()
run.params.add_values({"qc_params": qc_params})

# register the output artifact
output = ln.Artifact.from_dir(f"{input_dir}/registration")
ln.save(output)

ln.finish()
!python mcmicro.py exemplar-001
Hide code cell output
💡 connected lamindb: testuser1/nextflow-mcmicro
❗ record with similar name exists! did you mean to load it?
<QuerySet [Transform(uid='HXJzgRZiHrZ6cSof', version='1.0.0', name='MCMICRO exemplar', type='pipeline', reference='https://github.com/labsyspharm/mcmicro', created_by_id=1, updated_at='2024-07-26 14:37:05 UTC')]>
💡 saved: Transform(uid='gMaCA4s3GSkwZOyX', version='1.0.0', name='MCMICRO', type='pipeline', reference='https://github.com/labsyspharm/mcmicro', created_by_id=1, updated_at='2024-07-26 14:37:08 UTC')
💡 saved: Run(uid='Fc8ENFkKm2h3YdMOu6LW', transform_id=2, created_by_id=1)
N E X T F L O W  ~  version 24.04.3
Launching `https://github.com/labsyspharm/mcmicro` [clever_wiles] DSL2 - revision: 5883dda717 [master]
[14/8c62e1] Submitted process > illumination (3)
[7b/026a27] Submitted process > illumination (2)
[a6/d3fd1d] Submitted process > illumination (1)
[c9/fdf4b8] Submitted process > registration:ashlar (1)
💡 returning existing ULabel record with same name: 'nextflow'
❗ this creates one artifact per file in the directory - you might simply call ln.Artifact(dir) to get one artifact for the entire directory

Data lineage

View data lineage:

output = ln.Artifact.filter(key__icontains="exemplar-001.ome.tif").one()
output.view_lineage()
_images/105cefeb040f137284188c18aae2c26fdaaa893714e7120d88dd3038f50ca0eb.svg

View transforms and runs in LaminHub

hub

View the database content

ln.view()
Artifact
uid version description key suffix type accessor size hash hash_type n_objects n_observations visibility key_is_virtual storage_id transform_id run_id created_by_id updated_at
id
4 2TwWA8djdHQ0rbTuWEeZ None None exemplar-001/registration/exemplar-001.ome.tif .tif dataset None 175490712 gHRVbQOFd4P2eO5_EMexZr sha1-fl NaN None 1 False 1 2 2 1 2024-07-26 14:38:57.045982+00:00
2 a1nqhzfJQroJWsRC0g1Z None exemplar-001 exemplar-001 dataset None 332630483 lpOStRajj_SuLBtzXF4KjQ md5-d 10.0 None 1 False 1 1 1 1 2024-07-26 14:37:06.768824+00:00
Run
uid started_at finished_at is_consecutive reference reference_type transform_id report_id environment_id created_by_id
id
1 E7G9YabclsJILSkVERHz 2024-07-26 14:37:05.469329+00:00 2024-07-26 14:37:06.771436+00:00 None 00ec8982-8172-4915-8f9a-d7ce54abebdc nextflow_id 1 1 None 1
2 Fc8ENFkKm2h3YdMOu6LW 2024-07-26 14:37:08.882861+00:00 2024-07-26 14:38:57.048630+00:00 None 5596733f-2bc8-4e2b-b2df-d2b6d55ba656 nextflow_id 2 3 None 1
Storage
uid root description type region instance_uid run_id created_by_id updated_at
id
1 RwbhtZx7metu /home/runner/work/nextflow-lamin/nextflow-lami... None local None None None 1 2024-07-26 14:36:43.478136+00:00
Transform
uid version name key description type reference reference_type latest_report_id source_code_id created_by_id updated_at
id
2 gMaCA4s3GSkwZOyX 1.0.0 MCMICRO None None pipeline https://github.com/labsyspharm/mcmicro None None None 1 2024-07-26 14:37:08.878023+00:00
1 HXJzgRZiHrZ6cSof 1.0.0 MCMICRO exemplar None None pipeline https://github.com/labsyspharm/mcmicro None None None 1 2024-07-26 14:37:05.463292+00:00
ULabel
uid name description reference reference_type run_id created_by_id updated_at
id
1 alvQoyv4 nextflow None None None 1 1 2024-07-26 14:38:56.906401+00:00
User
uid handle name updated_at
id
1 DzTjkKse testuser1 Test User1 2024-07-26 14:36:43.473391+00:00
Hide code cell content
# clean up the test instance:
!lamin delete --force nextflow-mcmicro
💡 deleting instance testuser1/nextflow-mcmicro
Traceback (most recent call last):
  File "/opt/hostedtoolcache/Python/3.10.14/x64/bin/lamin", line 8, in <module>
    sys.exit(main())
  File "/opt/hostedtoolcache/Python/3.10.14/x64/lib/python3.10/site-packages/rich_click/rich_command.py", line 367, in __call__
    return super().__call__(*args, **kwargs)
  File "/opt/hostedtoolcache/Python/3.10.14/x64/lib/python3.10/site-packages/click/core.py", line 1157, in __call__
    return self.main(*args, **kwargs)
  File "/opt/hostedtoolcache/Python/3.10.14/x64/lib/python3.10/site-packages/rich_click/rich_command.py", line 152, in main
    rv = self.invoke(ctx)
  File "/opt/hostedtoolcache/Python/3.10.14/x64/lib/python3.10/site-packages/click/core.py", line 1688, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/opt/hostedtoolcache/Python/3.10.14/x64/lib/python3.10/site-packages/click/core.py", line 1434, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/opt/hostedtoolcache/Python/3.10.14/x64/lib/python3.10/site-packages/click/core.py", line 783, in invoke
    return __callback(*args, **kwargs)
  File "/opt/hostedtoolcache/Python/3.10.14/x64/lib/python3.10/site-packages/lamin_cli/__main__.py", line 105, in delete
    return delete(instance, force=force)
  File "/opt/hostedtoolcache/Python/3.10.14/x64/lib/python3.10/site-packages/lamindb_setup/_delete.py", line 136, in delete
    isettings.storage.root.rmdir()
  File "/opt/hostedtoolcache/Python/3.10.14/x64/lib/python3.10/pathlib.py", line 1215, in rmdir
    self._accessor.rmdir(self)
OSError: [Errno 39] Directory not empty: '/home/runner/work/nextflow-lamin/nextflow-lamin/docs'