Snakemake/Google-Life-Sciences aggregate funtion issue
0
1
Entering edit mode
3.0 years ago
Stephan ▴ 10

Hi, I'm trying to make a pipeline work on Google cloud using the --google-life-sciences flag of snakemake. I use a checkpoint and an aggregate function to retrieve the file names in storage and make a wildcard with them, which I need for the identify rule. The issue is when I try to use the aggregate function as input for the next rule, merge. If you look at error from snakemake, when it gets executed everything should be fine as it gets the right input files, although the inputs don't show up in the error. The description of the error from google-life-sciences confuses me though. Why does it want the output from the checkpoint split as input and why does it have double the bucket name? is there a way to fix this or an entirely differnt way?

My goal is to make the merge rule merge all the output from the identify rule into one file.

An example of the pipeline:

from google.cloud import storage 
from snakemake.remote.GS import RemoteProvider as GSRemoteProvider 
GS = GSRemoteProvider() 

        rule all:
            input:
                expand('pipeline_output/split_merged/{sample}_merged.txt', sample=['file1']) #file one contains three lines: file1, file2, file3

        #should make 3 files: file100.txt, file101.txt, file102.txt each containing one line.
    checkpoint split:
            input:
                "sample/{sample}.txt"
            output:
                directory("pipeline_output/split_files_{sample}")
            shell:
                """
                mkdir -p {output}
                split -d {input} {output}/{wildcards.sample} -l 1 --additional-suffix=.txt
                """
        #makes 3 new files with the same content just a different name. rule identify:
            input:
                sam = 'pipeline_output/split_files_{sample}/{i}.txt'
            output:
                txt = 'pipeline_output/split_files_identified_{sample}/{i}_identified.txt',
            shell:
                '''
                cat {input} > {output}
                '''
        #aggregate function. gets the file names from the files generated by the split checkpoint that are in 
        #cloud storage and uses those as wildcard. 
    def all_input(wildcards):
            checkpoint_output=checkpoints.split.get(sample=wildcards.sample).output[0]
            files= []
            blobs = []
            client = storage.Client()
            for blob in client.list_blobs('checkpoint_example', prefix='pipeline_output/split_files_%s' % (wildcards)):
                blobs.append(str(blob))
            for i in blobs:
                if '.txt' in i:
                    split_blob = i.split(',')
                    file_path = split_blob[1].split('/')
                    files.append(file_path[2][:-4])
            merge_inputs = expand('checkpoint_example/pipeline_output/split_files_identified_{sample}/{i}_identified.txt',
                   i=files, sample=wildcards.sample)
            print(merge_inputs)
            return GS.remote(merge_inputs) 

        #takes the output from the aggregate function which should be the 3 files made in the identify rule. 
        #and puts it all in one file. 
    rule merge:
            input:
                all_input
            output:
                'split_merged/{sample}_merged.txt'
            shell:
                '''
                cat {input} > {output}
                '''

Error from snakemake:

[Tue May 11 11:41:50 2021]
rule merge:
    input: checkpoint_example/pipeline_output/split_files_identified_file1/file100_identified.txt, checkpoint_example/pipeline_output/split_files_identified_file1/file101_identified.txt, checkpoint_example/pipeline_output/split_files_identified_file1/file102_identified.txt
    output: checkpoint_example/pipeline_output/split_merged/file1_merged.txt
    jobid: 1
    wildcards: sample=file1
    resources: machine_type=n1-standard


Error in rule merge:
    jobid: 1
    output: checkpoint_example/pipeline_output/split_merged/file1_merged.txt
    shell:

        cat checkpoint_example/pipeline_output/split_files_identified_file1/file100_identified.txt checkpoint_example/pipeline_output/split_files_identified_file1/file101_identified.txt checkpoint_example/pipeline_output/split_files_identified_file1/file102_identified.txt > checkpoint_example/pipeline_output/split_merged/file1_merged.txt

        (one of the commands exited with non-zero exit code; note that snakemake uses bash strict mode!)

The input for the rule is correct.

description I get from google-life-sciences for the error:

Building DAG of jobs...
        MissingInputException in line 54 of /workdir/example:
        Missing input files for rule merge:
        checkpoint_example/checkpoint_example/pipeline_output/split_files_file1
snakemake Google-Life-Sciences • 480 views
ADD COMMENT

Login before adding your answer.

Traffic: 2015 users visited in the last hour
Help About
FAQ
Access RSS
API
Stats

Use of this site constitutes acceptance of our User Agreement and Privacy Policy.

Powered by the version 2.3.6