Nextflow¶
Nextflow is the most widely used workflow manager in bioinformatics.
This guide shows how to register a Nextflow run with inputs & outputs for the example of the nf-core/scrnaseq pipeline by running a Python script.
The approach could be automated by deploying the script via
a serverless environment trigger (e.g., AWS Lambda)
a post-run script on the Seqera Platform
What steps are executed by the nf-core/scrnaseq pipeline?
!lamin init --storage ./test-nextflow --name test-nextflow
Run the pipeline¶
Let’s download the input data from an S3 bucket.
import lamindb as ln
input_path = ln.UPath("s3://lamindb-test/scrnaseq_input")
input_path.download_to("scrnaseq_input")
→ connected lamindb: testuser1/test-nextflow
And run the nf-core/scrnaseq
pipeline.
# the test profile uses all downloaded input files as an input
!nextflow run nf-core/scrnaseq -r 2.7.1 -profile docker,test -resume --outdir scrnaseq_output
What is the full run command for the test profile?
nextflow run nf-core/scrnaseq -r 2.7.1 \
-profile docker \
-resume \
--outdir scrnaseq_output \
--input 'scrnaseq_input/samplesheet-2-0.csv' \
--skip_emptydrops \
--fasta 'https://github.com/nf-core/test-datasets/raw/scrnaseq/reference/GRCm38.p6.genome.chr19.fa' \
--gtf 'https://github.com/nf-core/test-datasets/raw/scrnaseq/reference/gencode.vM19.annotation.chr19.gtf' \
--aligner 'star' \
--protocol '10XV2' \
--max_cpus 2 \
--max_memory '6.GB' \
--max_time '6.h'
Run the registration script¶
After the pipeline has completed, a Python script registers inputs & outputs in LaminDB.
nf-core/scrnaseq run registration¶
import argparse
import lamindb as ln
import json
import re
from pathlib import Path
def parse_arguments() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument("--input", type=str, required=True)
parser.add_argument("--output", type=str, required=True)
return parser.parse_args()
def register_pipeline_io(input_dir: str, output_dir: str, run: ln.Run) -> None:
"""Register input and output artifacts for an `nf-core/scrnaseq` run."""
input_artifacts = ln.Artifact.from_dir(input_dir, run=False)
ln.save(input_artifacts)
run.input_artifacts.set(input_artifacts)
ln.Artifact(f"{output_dir}/multiqc", key="multiqc report", run=run).save()
ln.Artifact(
f"{output_dir}/star/mtx_conversions/combined_filtered_matrix.h5ad",
key="filtered_count_matrix.h5ad",
run=run,
).save()
def register_pipeline_metadata(output_dir: str, run: ln.Run) -> None:
"""Register nf-core run metadata stored in the 'pipeline_info' folder."""
ulabel = ln.ULabel(name="nextflow").save()
run.transform.ulabels.add(ulabel)
# nextflow run id
content = next(Path(f"{output_dir}/pipeline_info").glob("execution_report_*.html")).read_text()
match = re.search(r"run id \[([^\]]+)\]", content)
nextflow_id = match.group(1) if match else ""
run.reference = nextflow_id
run.reference_type = "nextflow_id"
# completed at
completion_match = re.search(r'<span id="workflow_complete">([^<]+)</span>', content)
if completion_match:
from datetime import datetime
timestamp_str = completion_match.group(1).strip()
run.finished_at = datetime.strptime(timestamp_str, "%d-%b-%Y %H:%M:%S")
# execution report and software versions
for file_pattern, description, run_attr in [
("execution_report*", "execution report", "report"),
("nf_core_pipeline_software*", "software versions", "environment"),
]:
artifact = ln.Artifact(
next(Path(f"{output_dir}/pipeline_info").glob(file_pattern)),
key=f"nextflow run {description} of {nextflow_id}",
visibility=0,
run=False,
).save()
setattr(run, run_attr, artifact)
# nextflow run parameters
params_path = next(Path(f"{output_dir}/pipeline_info").glob("params*"))
with params_path.open() as params_file:
params = json.load(params_file)
ln.Param(name="params", dtype="dict").save()
run.params.add_values({"params": params})
run.save()
args = parse_arguments()
scrnaseq_transform = ln.Transform(
key="scrna-seq",
version="2.7.1",
type="pipeline",
reference="https://github.com/nf-core/scrnaseq",
).save()
run = ln.Run(transform=scrnaseq_transform).save()
register_pipeline_io(args.input, args.output, run)
register_pipeline_metadata(args.output, run)
!python register_scrnaseq_run.py --input scrnaseq_input --output scrnaseq_output
Data lineage¶
The output data could now be accessed (in a different notebook/script) for analysis with full lineage.
matrix_af = ln.Artifact.get(key__icontains="filtered_count_matrix.h5ad")
matrix_af.view_lineage()
View transforms & runs on the hub¶
View the database content¶
ln.view()