groovy 如何调用nextflow(DSL2)模块在多个输入上运行而不重复代码

vuktfyat  于 8个月前  发布在  其他
关注(0)|答案(1)|浏览(59)

我试图将工作流的以下部分折叠成不需要重复代码的东西,如果可能的话,请注意,代码可以工作,只是想以更简洁的方式 Package 它:这里,splittingChannel_s1splittingChannel_s2是前一通道的输出,并输入到当前通道。

include { ProcessA_Module as ProcessA_section1_Module} from './modules/Processes/module.nf'
include { ProcessA_Module as ProcessA_section2_Module} from './modules/Processes/module.nf'

workflow Pipeline {
    main:
//Section based workflow
        def vc_params_s1 = parameters.getSubParam("VC", "section1")  
        Channel_s1 = ProcessA_section1_Module(
            'sec1',
            vc_params_s1,
            splittingChannel_s1,
            reference,
            resources.resolveOtherResourceFile("file1.txt"),
         resources.resolveOtherResourceFile("file2.txt")
        )
        
        def vc_params_s2 = parameters.getSubParam("VC", "section2") 
        Channel_s2 = ProcessA_section2_Module(
            'sec2',
            vc_params_s2,
            splittingChannel_s2,
            reference,
            resources.resolveOtherResourceFile("file1.txt"),
         resources.resolveOtherResourceFile("file2.txt")
        )
    }

基本上我想做的是创建一个单一的调用,比如:

Channel = ProcessA(
          ['sec1','sec2'],
          [vc_params_s1, vc_params_s2],
          [splittingChannel_s1,splittingChannel_s2],
          //these below are the same for both channels
          reference, 
          resources.resolveOtherResourceFile("file1.txt"),
          resources.resolveOtherResourceFile("file2.txt")
)

此代码专门返回一个一般错误,例如:workflow Pipeline {: unexpected input所以我很确定这不是方法,另外,我需要能够调用Channel的第1节或第2节,我也不知道如何做到这一点,
我也尝试了不同的解决方案,但它更麻烦,也返回相同的错误:

def Processes_variables = [
    'sec1': [
        type: 'section1',
        vc_params: parameters.getSubParam("VC", "section1"),
        splittingChannel: splittingChannel_s1
    ],
    'sec2': [
        type: 'section2',
        vc_params: parameters.getSubParam("VC", "section2"),
        splittingChannel: splittingChannel_s2
    ]
]

def ProcessedChannels = Processes_variables.collect { key, config ->
    createVariantCallingChannel(
        config.type,
        config.vc_params,
        config.splittingChannel,
        reference.,
        resources.resolveOtherResourceFile("file1.txt"),
        resources.resolveOtherResourceFile("file2.txt")
    )
}

// Access the module calls for nDNA and mtDNA
def Channel_sec1 = ProcessedChannels.find { channel ->
    channel.type == 'sec1'
}

def Channel_sec2 = ProcessedChannels.find { channel ->
    channel.type == 'sec2'
}

所以,后者,除了语法错误之外,它开始添加代码而不是修剪它,这不是目的。避免代码重复的适当方法是什么?多谢了!

hjzp0vay

hjzp0vay1#

一种方法是将变量组件组合成元组,然后使用mix运算符组合两个通道。以下示例假定splittingChannel_s1splittingChannel_s2通道发出简单的file对象,例如:

process my_proc {

    input:
    tuple val(section), val(vc_params), path(input_file)
    path reference
    path file1_txt
    path file2_txt

    output:
    tuple val(section), path("${input_file}.out")

    """
    touch "${input_file}.out"
    """
}
workflow {

    ...
    
    vc_params1 = parameters.getSubParam("VC", "section1")
    vc_params2 = parameters.getSubParam("VC", "section2")
    
    section1_ch = splittingChannel_s1.map { tuple( 'sec1', vc_params1, it ) }
    section2_ch = splittingChannel_s2.map { tuple( 'sec2', vc_params2, it ) }

    my_proc(
        section1_ch.mix( section2_ch ),
        reference,
        resources.resolveOtherResourceFile("file1.txt"),
        resources.resolveOtherResourceFile("file2.txt"),
    )

    my_proc.out
        .branch { section, outfile ->
            section1: section == "sec1"
            section2: section == "sec2"
        }
        .set { results }

    results.section1.view { section, outfile ->
        "Section1 results ${section}: ${outfile}"
    }
    results.section2.view { section, outfile ->
        "Section2 results ${section}: ${outfile}"
    }

或者,如果元组中的元素数量开始变得过多,另一种方法是提供包含所需配置属性的Map对象,例如:

process my_proc {

    input:
    tuple val(config), path(input_file)
    path reference
    path file1_txt
    path file2_txt

    output:
    tuple val(config), path("${input_file}.out")

    """
    touch "${input_file}.out"
    """
}
workflow {

    ...

    def proc_config = [
        'sec1': [
            section_name: 'section1',
            vc_params: parameters.getSubParam("VC", "section1"),
        ],
        'sec2': [
            section_name: 'section2',
            vc_params: parameters.getSubParam("VC", "section2"),
        ]
    ]

    section1_ch = splittingChannel_s1.map { tuple( proc_config["sec1"], it ) }
    section2_ch = splittingChannel_s2.map { tuple( proc_config["sec2"], it ) }

    my_proc(
        section1_ch.mix( section2_ch ),
        reference,
        resources.resolveOtherResourceFile("file1.txt"),
        resources.resolveOtherResourceFile("file2.txt"),
    )

    my_proc.out
        .branch { config, outfile ->
            section1: config.section_name == "section1"
            section2: config.section_name == "section2"
        }
        .set { results }

    results.section1.view { config, outfile ->
        "Section1 results ${config.section_name}: ${outfile}"
    }
    results.section2.view { config, outfile ->
        "Section2 results ${config.section_name}: ${outfile}"
    }
}

相关问题