Nextflow基于数据流编程模型,其中流程通过通道进行通信。

通道具有两个主要属性:

  1. 发送消息是一个异步操作,无需等待接收过程即可立即完成。
  2. 接收数据是一项阻止操作,它将停止接收过程,直到消息到达为止。

通道类型

Nextflow区分两种不同的通道:队列通道和值通道(queue channels and value channels )。

queue channels

队列通道是连接两个 processes 或 operators 非阻挡单向FIFO队列。

通常使用工厂方法(如fromfromPath等)创建队列通道,或使用mapflatMap等通道操作符将其链接起来。

队列通道也是由使用into子句的流程输出声明创建的。

该定义意味着同一队列通道不能用作进程输出多次,而不能用作进程输入一次。

在需要将一个流程输出通道连接到多个流程的情况下,一个流程或操作员可以使用 into运算符创建同一通道的两个(或多个)副本,并使用每个副本来连接一个单独的流程。

Value channel

根据定义,值通道(又称单例通道)绑定到一个值,并且可以无限制地读取该值而不消耗其内容。

因此,一个值通道可以通过多个 process 用作输入。

使用工厂方法或由操作员返回单个值来创建一个值通道,例如firstlastcollectcountminmaxreducesum

当输入在from子句中指定简单值时,进程将隐式创建一个值通道。此外,还将为输入仅是值通道的过程隐式创建一个值通道作为输出。

例如:

process foo {
input:
val x from 1
output:
file 'x.txt' into result

"""
echo $x > x.txt
"""
}

以上代码段中的过程声明了一个输入,该输入隐式为一个值通道。因此,result输出也是一个可以通过多个过程读取的值通道。

了解多个输入通道的工作方式

流程的关键特征是能够处理来自多个通道的输入。

当将两个或多个通道声明为过程输入时,过程将停止,直到存在完整的输入配置。它从所有声明为输入的通道中接收输入值。

验证此条件后,它将捕获来自各个通道的输入值,并生成任务执行,然后重复相同的逻辑,直到一个或多个通道不再有内容。

这意味着通道值是一个接一个地连续捕获的,即使其他通道中还有其他值,第一个空通道也会导致进程执行停止。

例如:

process foo {
echo true
input:
val x from Channel.from(1,2)
val y from Channel.from('a','b','c')
script:
"""
echo $x and $y
"""
}

该过程foo执行两次,因为第一个输入通道仅提供两个值,因此c元素被丢弃。它打印:

1 and a
2 and b

当使用Value通道(又称为Singleton通道)时,将应用不同的语义。

这种通道是通过Channel.value工厂方法创建的,或者在流程输入在from子句中指定简单值时隐式创建的。

根据定义,值通道绑定到单个值,并且可以无限制地读取该值而不消耗其内容。

这些属性使得将值通道与一个或多个(队列)通道混合时,不会影响仅取决于其他通道的过程终止,并且其内容会重复应用。

为了更好地理解此行为,请将前面的示例与以下示例进行比较:

process bar {
echo true
input:
val x from Channel.value(1)
val y from Channel.from('a','b','c')
script:
"""
echo $x and $y
"""
}

上面的代码段执行bar过程三次,因为第一个输入是一个值通道,因此可以根据需要读取其内容多次。进程终止由第二通道的内容确定。它打印:

1 and a
1 and b
1 and c

Channel factory

可以通过 process 输出声明隐式创建通道,也可以使用以下通道工厂方法显式创建通道。

对于版本20.07.0,channel.已将前缀作为的别名引入Channel.,因此工厂方法可以与语法channel.from()和一起使用Channel.from(),依此类推。

create

此方法已弃用,在DSL2语法中将不可用。

使用方法创建一个新频道create,如下所示:

channelObj = Channel.create()

of

of方法允许您创建一个通道,该通道给出方法参数的值,例如:

ch = Channel.of( 1, 3, 5, 7 )
ch.view { "value: $it" }

本示例中的第一行创建一个变量ch,该变量保存一个通道对象。该通道发出在of方法中指定为参数的值。因此,第二行显示以下内容:

value: 1
value: 3
value: 5
value: 7

值的范围会相应扩大。

Channel
.of(1..23, 'X', 'Y')
.view()

显示:

1
2
3
4
:
23
X
Y

此功能需要更高版本的Nextflow 19.10.0。

from

不推荐使用此方法,仅应将其用于旧代码中的向后兼容性。使用offromList代替。

from方法会创建一个通道,该通道给出指定为方法参数的值,例如:

ch = Channel.from( 1, 3, 5, 7 )
ch.subscribe { println "value: $it" }

本示例中的第一行创建一个变量ch,该变量保存一个通道对象。

该通道发出在from方法中指定为参数的值。因此,第二行将打印以下内容:

value: 1
value: 3
value: 5
value: 7

以下示例显示了如何根据一系列数字或字符串创建频道:

zeroToNine = Channel.from( 0..9 )
strings = Channel.from( 'A'..'Z' )

请注意,当from参数是实现(Java)Collection接口的对象时 ,结果通道将collection 元素单独给出。

因此,以下两种声明即使在第一种情况下将项目指定为多个参数,而在第二种情况下将其指定为单个列表对象参数,则产生的结果相同:

Channel.from( 1, 3, 5, 7, 9 )
Channel.from( [1, 3, 5, 7, 9] )

因此,以下示例创建一个通道,该通道发出三个条目,每个条目都是一个包含两个元素的列表:

Channel.from( [1, 2], [5,6], [7,9] )

value

value工厂方法用于创建一个值的信道。

null可以指定一个可选的not 参数,以将通道绑定到特定值。例如:

expl1 = Channel.value()
expl2 = Channel.value( 'Hello there' )
expl3 = Channel.value( [1,2,3,4,5] )

示例中的第一行创建一个“空”变量。

第二行创建一个通道并将一个字符串绑定到该通道。

最后一个创建一个通道,并将一个列表对象绑定到该通道,该列表对象将作为唯一的通道发出。

fromList

fromList方法创建一个通道,该通道发出作为元素列表提供的值,例如:

Channel
.fromList( ['a', 'b', 'c', 'd'] )
.view { "value: $it" }

印刷品:

a
b
c
d

此功能需要更高版本的Nextflow 19.10.0。

fromPath

fromPath方法指定路径字符串作为参数来创建给出一个或多个文件路径的通道。例如:

myFileChannel = Channel.fromPath( '/data/some/bigfile.txt' )

注意:它不检查文件是否存在。

每当fromPath参数包含*?通配符时,它将被解释为全局路径匹配器。例如:

myFileChannel = Channel.fromPath( '/data/big/*.txt' )

本示例创建一个通道,并发出Path与文件夹中带有txt扩展名的/data/big文件一样多的项目。

两个星号(即**)的工作方式类似*,但跨越目录边界。

此语法通常用于匹配完整路径。圆括号指定子模式的集合。

例如:

files = Channel.fromPath( 'data/**.fa' )
moreFiles = Channel.fromPath( 'data/**/*.fa' )
pairFiles = Channel.fromPath( 'data/file_{1,2}.fq' )

第一行返回一个通道,该通道给出data文件夹中以后缀.fa结尾的文件,并在其所有子文件夹中递归。

第二个只给出具有在data路径的任何子文件夹中具有相同后缀的文件。

最后一个示例给出两个文件:data/file_1.fqdata/file_2.fq

与在Linux Bash中一样,*通配符与隐藏文件(即,名称以.字符开头的文件)不匹配。

隐藏文件

为了包括隐藏文件,您需要以句点字符开头或指定选项。例如:hidden: true

expl1 = Channel.fromPath( '/path/.*' )
expl2 = Channel.fromPath( '/path/.*.fa' )
expl3 = Channel.fromPath( '/path/*', hidden: true )

第一个示例返回指定路径中的所有隐藏文件。

第二个返回所有以.fa后缀结尾的隐藏文件。

最后一个示例返回该路径中的所有文件(隐藏和非隐藏)。

目录路径

默认情况下,全局模式仅查找符合指定条件的常规文件路径,即它不会返回目录路径。

可以使用type指定值的参数filedirany为了定义所需的路径。例如:

myFileChannel = Channel.fromPath( '/path/*b', type: 'dir' )
myFileChannel = Channel.fromPath( '/path/a*', type: 'any' )

第一个示例将返回所有以后缀结尾的目录路径b

第二个示例将返回以a前缀开头的任何文件和目录。

可选参数表

名称 描述
glob true解释字符*?[]{}作为全局通配符,否则处理它们的正常字符(默认值:true
type 键入的返回路径,无论是filedirany(默认值:file
hidden true包括在所得到的路径隐藏文件(默认值:false
maxDepth 要访问的最大目录级别数(默认值:无限制)
followLinks 如果为true,则在遍历目录树时会跟随符号链接,否则将它们作为文件进行管理(默认值:true)
relative true返回路径是相对于最顶层的公共目录(默认值:false
checkIfExists 如果为true,则在文件系统中不存在指定路径的异常(默认值:false

可以使用列表作为参数指定多个路径或全局模式:

Channel.fromPath( ['/some/path/*.fq', '/other/path/*.fastq'] )

fromFilePairs

fromFilePairs方法创建一个通道,该通道发出与用户提供的全局模式匹配的文件对。匹配文件以元组的形式发出,其中第一个元素是匹配对的分组键,第二个元素是文件列表(按字典顺序排序)。例如:

Channel
.fromFilePairs('/my/data/SRR*_{1,2}.fastq')
.println()

它将产生类似于以下内容的输出:

[SRR493366, [/my/data/SRR493366_1.fastq, /my/data/SRR493366_2.fastq]]
[SRR493367, [/my/data/SRR493367_1.fastq, /my/data/SRR493367_2.fastq]]
[SRR493368, [/my/data/SRR493368_1.fastq, /my/data/SRR493368_2.fastq]]
[SRR493369, [/my/data/SRR493369_1.fastq, /my/data/SRR493369_2.fastq]]
[SRR493370, [/my/data/SRR493370_1.fastq, /my/data/SRR493370_2.fastq]]
[SRR493371, [/my/data/SRR493371_1.fastq, /my/data/SRR493371_2.fastq]]

全局模式必须至少包含一个星号通配符。

或者,可以实现自定义文件对分组策略,以提供一个闭包,在给定当前文件作为参数的情况下,该闭包返回分组密钥。例如:

Channel
.fromFilePairs('/some/data/*', size: -1) { file -> file.extension }
.println { ext, files -> "Files with the extension $ext are $files" }

可用的可选参数表:

名称 描述
type 键入的返回路径,无论是filedirany(默认值:file
hidden true包括在所得到的路径隐藏文件(默认值:false
maxDepth 要访问的最大目录级别数(默认值:无限制)
followLinks true它遵循在目录树的遍历符号链接,否则会被管理的文件(默认:true
size 定义每个发出的项目应保留的文件数(默认值:2)。设置-1为任意。
flat true发出的元组中将匹配文件作为唯一元素生成时(默认值:)false
checkIfExists true抛出指定路径的例外在文件系统中不存在(默认值:false

可以使用列表作为参数指定多个glob模式:

Channel.fromFilePairs( ['/some/data/SRR*_{1,2}.fastq', '/other/data/QFF*_{1,2}.fastq'] )

fromSRA

fromSRA方法查询NCBI SRA数据库并返回一个通道,该通道发出与指定标准(即项目或登录号)匹配的FASTQ文件。例如:

Channel
.fromSRA('SRP043510')
.println()

它返回:

[SRR1448794, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR144/004/SRR1448794/SRR1448794.fastq.gz]
[SRR1448795, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR144/005/SRR1448795/SRR1448795.fastq.gz]
[SRR1448792, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR144/002/SRR1448792/SRR1448792.fastq.gz]
[SRR1448793, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR144/003/SRR1448793/SRR1448793.fastq.gz]
[SRR1910483, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR191/003/SRR1910483/SRR1910483.fastq.gz]
[SRR1910482, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR191/002/SRR1910482/SRR1910482.fastq.gz]
(remaining omitted)

可以使用列表对象指定多个登录ID:

ids = ['ERR908507', 'ERR908506', 'ERR908505']
Channel
.fromSRA(ids)
.println()
[ERR908507, [ftp://ftp.sra.ebi.ac.uk/vol1/fastq/ERR908/ERR908507/ERR908507_1.fastq.gz, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/ERR908/ERR908507/ERR908507_2.fastq.gz]]
[ERR908506, [ftp://ftp.sra.ebi.ac.uk/vol1/fastq/ERR908/ERR908506/ERR908506_1.fastq.gz, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/ERR908/ERR908506/ERR908506_2.fastq.gz]]
[ERR908505, [ftp://ftp.sra.ebi.ac.uk/vol1/fastq/ERR908/ERR908505/ERR908505_1.fastq.gz, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/ERR908/ERR908505/ERR908505_2.fastq.gz]]

在后台,它使用NCBI ESearch API,因此该fromSRA方法允许使用此API支持的任何查询词。

可用的可选参数表:

名称 描述
apiKey NCBI用户API密钥。
cache 启用/禁用缓存API请求(默认值:)true
max 可以重试的最大条目数(默认值:无限制)。

要访问NCBI搜索服务, 应提供以下NCBI API密钥

  • 使用apiKey可选参数,Channel.fromSRA(ids, apiKey:'0123456789abcdef')
  • NCBI_API_KEY在您的环境中导出变量,例如。export NCBI_API_KEY=0123456789abcdef

此功能需要Nextflow 19.04.0或更高版本。

watchPath

watchPath方法监视文件夹中是否有匹配指定模式的一个或多个文件。一旦存在满足指定条件的文件,该文件就会通过该watchPath 方法返回的通道发出。可以通过使用*?通配符(即通过指定全局路径匹配条件)来指定要监视的文件上的条件。

例如:

Channel
.watchPath( '/path/*.fa' )
.subscribe { println "Fasta file: $it" }

默认情况下,它仅监视在指定文件夹中创建的新文件。可选地,可以提供第二个参数来指定要观看的事件。支持的事件是:

名称 描述
create 创建一个新文件(默认)
modify 文件被修改
delete 文件被删除

您可以使用逗号分隔的字符串来指定多个这些事件之一,如下所示:

Channel
.watchPath( '/path/*.fa', 'create,modify' )
.subscribe { println "File created or modified: $it" }

watchPath工厂无休止地等待为符合指定模式和事件(一个或多个)的文件。因此,无论何时在脚本中使用它,生成的管道都将永远不会结束。

empty

根据empty定义,工厂方法将创建一个不发出任何值的通道。

绑定值

由于在Nextflow中,通道是使用数据流变量或队列实现的。因此,发送消息等效于将值绑定到表示通信通道的对象。

绑定

通道对象提供 bind() 方法,这是通过通道发送消息的基本操作。例如:

myChannel = Channel.create()
myChannel.bind( 'Hello world' )

operator <<

运算符<<只是bind方法的语法糖。因此,下面的示例产生与上一个相同的结果:

myChannel = Channel.create()
myChannel << 'Hello world'

观察事件

subscribe

subscribe方法允许每次源通道发出新值时执行用户定义功能。

发出的值隐式传递给指定的函数。例如:

// define a channel emitting three values
source = Channel.from ( 'alpha', 'beta', 'delta' )

// subscribe a function to the channel printing the emitted values
source.subscribe { println "Got: $it" }
Got: alpha
Got: beta
Got: delta

形式上,用户定义的函数是Nextflow脚本所基于Closure的Groovy编程语言所定义的。

如果需要,可以使用除之外的其他名称it(可选地指定期望值的类型)来明确定义闭包参数,如以下示例所示:

Channel
.from( 'alpha', 'beta', 'lambda' )
.subscribe { String str ->
println "Got: ${str}; len: ${str.size()}"
}
Got: alpha; len: 5
Got: beta; len: 4
Got: lambda; len: 6

onNext,onComplete和onError

subscribe方法可以接受以下一个或多个事件处理程序:

  • onNext:注册一个在通道发出值时调用的函数。这在上面的示例中所述的带普通闭包的使用相同。
  • onComplete:注册一个函数,该函数在通道发出最后一个值后调用。
  • onError:注册在处理onNext事件时引发异常时调用的函数 。不会再呼叫onNextonComplete。该onError方法将Throwable导致错误的参数作为其参数。

例如:

Channel
.from( 1, 2, 3 )
.subscribe onNext: { println it }, onComplete: { println 'Done' }
1
2
3
Done