用Kotlin实现一个超简陋的BitTorrent客户端(二)
于是上次提到的那个客户端的进展……大概是完工了(。这里记录一下第二阶段踩的一些坑,一些关于BitTorrent协议的有趣的地方,也有一些我自己觉得值得记录下来的一些实现细节吧。
续篇
首先提一下在解析PeerResponse的时候,遇到了值的类型为List<Result<*>>
但是需要获取Result<List<*>>
的情况。这个是很经典的Result和List的互操作了,于是导入了库里面的sequenceLeft(list:)
函数。原来的库用的是自行实现的链表数据结构,但是稍微改造一下就可以把函数用在Kotlin内建的List上了。
1 |
|
注意这里的fold要显式注明参数类型。
于是又接着继续写下去了。本着从简单到难的原则,首先实现了几个data class。
BitTorrentMessage
这个data class代表一个在peer之间传来传去的数据,Message的数据结构基于这里的描述。注意这个类的
toString
方法被重载了,首先会生成包含用4个Byte encode的消息长度,然后是一个Byte表示的Message Id,紧接着是这个Message的payload。重载的方法是用的
List<Byte>.map { it.toChar() }.joinToString("")
,这样的写法可以安全地把字符串按照每个字符的字面量来串接起来。Block
这个类是对每个分块(最小的传输单元)的简单包装,加上了piece、offset、length等数据,以及一个代表该Block是否已经传输完毕的Status。
Piece
这个类表示由一组分块组成的分片,每个分片对应着pieces hash里面的一个hash值。这个值会在该分片下载完毕后和该分片的hash值进行匹配。如果不匹配的话,每个分块都会被重新标pending然后重新回到下载流程。注意这里的getData()方法只需要把每个block的data join起来就行了。
这里在进行hash值匹配的时候有一个坑,之前提取的hashValue虽然长度也是20,但是底层的字符数组的长度变成了40,每一个byte后面紧跟一个0。所以跟
getData().hash().hexDecode()
后的值(字符数组长度为20)不匹配了。对hashValue使用map { it.toByte() }
可以消除这些0,不过这样一来两边都要变成字符数组来进行匹配。至于为什么会多出一堆0来,还是跟字符串的encoding有关吧。
顺便解释一下BitTorrent里面分片和分块的概念。用BitTorrent下载文件的时候,数据被分成大小相等(除了最后一个)的若干个分片,每个分片的大小由我们之前解析的info字典里的piece length指定。但是,实际进行传输的时候使用的是更小的分块,因此每个分片又可以被细分为若干个大小相等(除了最后一个)的分块。分块的大小一般是
2.pow(14)
也就是16384字节。
和Peers沟通
从Tracker那里得到peer的信息后,接下来要做的自然就是和每个peer建立连接请求下载数据。这里建立TCP连接就直接用了Ktor的Socket库了。这个Socket附带着一个ByteReadChannel和一个ByteWriteChannel,因为只能各打开一次,所以就在初始化socket的时候把它们顺便也开启了。
注意openWriteChannel()
的时候要把autoFlush设为true。
在发送数据的时候,按照老样子,要用map { it.code.toByte() }
获取字节数组,然后可以转换为ByteArray以便和writeAvailable()
的函数签名对得上。
在接受数据的时候,有两种情况,一种是已经知道要接受多少数据了(在握手的时候),只要按照这个大小建立一个buffer然后传给ByteReadChannel去读取即可。另一种情况是不知道要读取多少数据,这时候就先读前4个字节,获取一个Int(接下来要读的长度),然后根据这个长度去开buffer。注意无论哪种情况在Ktor里面都要readFully()
,不然channel里面残留的字节会在下一次读取的时候被读到,然后导致接下来读的都是错的。
BitTorrent握手
建立好TCP连接后,就可以向Peer发送一个BitTorrent的handshake消息了。这个消息包括5个部分:
- 协议头长度,永远是19(0x13)。
- 协议头,永远是
BitTorrent protocol。
- 8个保留字符,用于指示是否使用某些扩展协议。但是这里不支持任何扩展协议,所以全部写
0x0
就可。 - 先前计算得出的
infohash
,用于指示想要哪些文件。 - 客户端自己的Peer ID
一个handshake消息可能长这样:
1 |
|
这里就使用了iterate
函数来生成Peer
ID最后的12位随机字串了。iterate
的原理大概是给定一个种子、一个变换函数和一个长度生成一个列表,和随机数的生成原理相近。
然后就是发送握手消息,如果一切顺利的话应该马上就可以收到Peer回复相同格式的消息,接下来比对两方的info_hash是否一致,再进行下一步。
交换信息
完成handshake之后,就可以发送和接受消息了……还不能,除非对方已经做好了准备。这个时候,客户端就可以被看作是Choked了。只有对方给客户端发送一个Unchoke之后,才可以向对方请求数据。要让对方把这边Unchoke的话,需要先向对方发送一条Interested信息。
除此之外,客户端还会收到一条BitField消息,这条消息会告诉这边对方有哪些分片(Pieces)。BitField的存储方式就是一条ByteArray,如果第index个bit的值为1,则说明对方拥有这个分片。如果值为0,就说明对方没有这个分片。
整个流程大约如下(不一定准确,因为貌似后面还有坑)
1 |
|
有关建立连接、和Peer握手、进行信息传输的部分实现在Worker
类里面。这里有一个点是socket和那两个channel如果创建失败,会返回null,所以不适合用lateinit
var而要直接用nullable,如果是null,就不需要关闭了。
发起请求
一个BitTorrent Request的结构如下:
1 |
|
这里的index是指明要下载的是第几个Piece。begin,或者说offset,则是这个Block在Piece里的起始位置,length是这个Block的实际大小。
比如说,要下载一个大小为135168字节的文件,每个Piece长度49152,那么要发送的Request如下:
1 |
|
之前提到的info字典表里面的pieces,就是每一个piece的SHA1 hash值拼接在一起的结果。想要确认是否下载到了正确的分块,只要计算一下它的Hash并和这个Hash值进行比较即可。
这里有几个需要注意的点:
Java内部使用的是网络序,就不需要像C/C++那样用htonl来进行转换了。
然后是个大坑:我一开始用的代码是类似这样的:
1
2
3
4val index = block.piece.intToBytes().map { it.toByte() }
val offset = block.offset.intToBytes().map { it.toByte() }
val length = block.length.intToBytes().map { it.toByte() }
val payload = (index + offset + length).joinToString("")这里有两个坑:
index
,offset
,length
长度不一定为完整的Int(4个字节),如果值很小的话可能只有1、2个字节,就会出现长度不一致。这里需要补前导0。joinToString()
函数会把字面量直接变为字符串,比如说[0x64,0x0]
会直接变成"640"
解决方案是写一个
expandToByteInts()
函数来补前导0,然后把转换得到的List换成Char的List,再join。这个思路模仿了Long.expandWithRem(divideBy:)
的写法(后面有提到)1
2
3
4
5
6fun Int.expandToByteInts(): List<Int> = unfold(this to 4) {
if (it.second == 0)
null
else
it.first % 256 to (it.first / 256 to it.second - 1)
}.reversed()有关fold/unfold/iterate这几个辅助函数的实现可以参考
utils/fp
包。这几个函数提供了把许多操作写成FP的写法的可能性。
管理Pieces
有了这些信息,还要去管理记录到底哪些Peer有哪些Piece,已经下载了哪些Piece还有还需要下载哪些Piece,所以需要一个PieceManager。这个PieceManager的主要任务是:
- 创建并存储所有的Piece对象
- 创建待下载的文件
- 在和新的Peer进行连接/断开连接的时候添加/更新/移除对应的Peer的信息(ID和BitField)
- 记录已下载的Block,在一个Piece的所有Block都下载完成的时候检查它的SHA1 Hash是否与info里面的值匹配,如果匹配,就把Piece写入文件
- 找到下一个需要下载的Block
- 检查文件是否已经下载完成
除此之外,也提供追踪进展的功能。
这里需要注意的有几点:
使用了RandomAccessFile(因为可以随机在不同的offset上进行读写),在初始化的时候要
setLength
为种子文件里面提取出的length值。在每次写入操作前要先计算出写入文件的offset(Piece的index乘以pieceLength),然后seek
到这个offset再写入。这里的offset并不是RandomAccessFile.write(data:off:len)
里面的offset,后者指的是对于写入的data的offset。因为要对Piece对象调用各种方法,所以AtomicReference是不能用的了。需要使用到Mutex锁。但是与此同时,也需要小心Mutex互锁的情况。例如:
1
2
3
4
5
6
7
8
9
10
11
12suspend fun isComplete(): Boolean = mutex.withLock {
...
}
suspend fun displayProgressBar() {
mutex.withLock {
...
if (isComplete()) {
...
}
}
}这里的
isComplete
会把mutex锁住,然后在displayProgressBar()
里又有一个大锁,结果就是程序被彻底锁死。解决方案是在displayProgressBar()
里不用withLock
而是把lock的过程拆开,在进入函数的时候上锁,在调用isComplete()
前把锁解开。在
blockReceived()
函数里面有个从pendingRequests里移除包含当前Block的Request的过程,但是如果pendingRequests里面不包含当前的Block也不应该报错,因为当前的Block很可能并不是pending的。关于获取下一个Block的策略,大致上是这样的:
- 如果之前请求的Block超时了,重新请求
- 请求现在正在下载的Piece里的下一个Block
- 如果当前没有正在下载的Piece,那么找到最「稀有」(被最少Peer所拥有)的Piece并请求下载
这里在
initiatePieces()
里面定义了一个Long.expandWithRem(divideBy:)
函数,主要是简单调用unfold来expand一个整数到一个列表,这个列表的每一项(除了最后一项)都是除数然后最后一项是余数,把这个列表的每一项加起来可以得到这个整数。例如,17 expandWith 5 可以得到 [5,5,5,2]。这里的注意事项是最后一项如果是0的时候应该被吃掉。然后就可以利用这个函数来生成Piece和对每个Piece生成Block了。
在
nextRequest()
函数里面,为了方便流式调用,把nullable转换成Option然后再转回来,这样并不是最好的写法,而且因为只在这个地方使用Option,所以需要写好几个帮助函数来根据不同的情况生成对应的各种Option值。不过这样一来,函数本体就可以很简单地这样写出来了:1
2
3
4
5
6
7mutex.withLock {
return missingPiecesEmpty().flatMap { cld ->
peersContainsIdEmpty().flatMap {
expiredRequestOpt().mapNone(::nextOngoingOpt).mapNone { getRarestPieceOpt(Option.Some(cld)) }
}
}.toNullable()
}比一堆if套娃可读性要高不知道哪里去了(bushi转换函数大概都长这样,就是简单地生成一个Option,作为桥接器:
1
2
3
4fun expiredRequestOpt(): Option<Block> = when (val er = expiredRequest(peerId)) {
null -> Option.None
else -> Option.Some(er)
}
多线程管理
既然上了Kotlin怎么能不用可爱的协程呢?(划掉
有几个不同的地方用了协程的不同特性:
PeerRetriever生产Peer并push到一个队列中,然后每个Worker从队列里提取Peer,这是一个生产-消费模型,用Channel就可以很好的协调起来了。
首先在TorrentClient里生产Peer:
1
2
3
4
5
6PeerRetriever(...)
.retrievePeers(...)
.let {
it.first.forEach { peers.send(it) }
// 这里的第一个it是个Peer的列表对Interval的Pair,取first
}然后把peers传给每一个Worker,
最后在Worker的主入口函数里消费Peer:
peer = peers.receive()
。Channel的容量设置为
Channel.UNLIMITED
,因为不想在生产端进行阻塞。每一个Worker都是一个协程,这一点做起来很简单,Worker的
start()
函数本就是suspend的,只需要在workers.onEach()
里面的it.start()
外面包裹一层scope就可以了:1
2
3CoroutineScope(Dispatchers.IO).launch {
it.start()
}当然也可以不包裹起来,这时候就是单线程模式,第一个协程下载完毕后才会交出控制权然后其他协程依次被启动。
单线程对于debug相对多线程要友好许多(雾
进度条也在一个单独的协程里,实现起来只要简单地把
pieceManager.trackProgress()
包裹在一个CoroutineScope下就可以了。
当然了,启动协程后还需要各线程之间进行交流,控制每个Worker在未完成下载的时候进行循环,在循环里反复请求下载下一个块,最后下载完毕后退出。这些主要是通过Worker类里面的terminated
、requestPending
等flag,PieceManager里的isComplete()
来进行控制的。
相对应的,PieceManager里面的missingPieces
、onGoingPieces
、finishedPieces
、pendingRequests
都使用了ConcurrentLinkedDeque,配合Mutex来避免产生race
condition。
记录日志
C++里面的loguru感觉很好用,但是Java里没有对应的库。对应的,选用了Logback库,配置xml文件参考了这里还有这里。Logback在打日志的时候会把自己的记录也打印出来,要关闭的话可以参考这里,不过必须把debug调为false才行。
然后为了能被全局开启关闭,就写了个带参单例Log类,在Log类里面设置了个flag(根据这里的提示设为@Volatile
),然后把info/warn/error/debug分别包装起来。在Kotlin里面实现单例模式很简单,只要所有东西都写在companion object
里面就好了。
这里有个看起来不是很漂亮的地方是Log里重复写了4次几乎相同的函数,但是考虑到反射的性能开销还有Log是高频调用的开销,就觉得还是这样好了(
关于Tracker的回复……Compact或者不是
想了很久没有想到怎样用orElse或者getOrElse来串接non-compact和compact的运算逻辑,最后还是添加了一个Result.isSuccess()
方法来提取尝试提取non-compact的结果,如果提取成功就按照non-compact来走,不然就提取compact的结果并且开始解码。
具体的实现倒是平淡无奇,只是简单地照着描述去实现而已,不过感觉写起来相当的丑啊(
记一个很隐蔽的bug的发现过程
起因是下载的时候总是进度为0,开了日志后发现虽然有不断调用下一个Piece,但是offset永远为0。
- 检查nextOngoing函数,怀疑是peer的BitField存错了(因为看到了一堆-1)
- 首先检查addPeer函数,发现问题不在这里
- 然后检查hasPiece函数,发现也没问题,而且-1实际上就是
ff
,换句话说就是每一个Piece都有的意思 - 回过头再看nextOngoing函数,给这个函数打断点,这里必须拆分出next变量并判断变量是否为空,然后在if和else里分别打断点了
- 发现了两个问题
- nextOngoing里面的linq可能写错了,因为
firstOrNull { ba.hasPiece(it.index) }
只测试这个Piece被Peer所拥有,但是这个Piece不一定有Missing块 - pendingRequest总是不会把每一个Piece的第一项包含进去
- nextOngoing里面的linq可能写错了,因为
- 仔细想的话,这里的问题是在pendingRequest不包含每个Piece的第一项所以它会一直处于Pending但是无法被提取的状态。如果所有的Pending的Block都在pendingRequest队列里面的话,那么确实不需要filter if any is Missing,反过来说,即使filter了any is Missing,仍然会有Piece因为含有Pending而被pass掉……
- 最后的解决方案是在
getRarestPieceOpt()
里面加了一行?.also { block -> block.appendToPendingRequests() }
。同时,把pending的间隔调短为1秒(因为会反复调取来下载在队列中但未被完全下载的块,所以间隔不能调太长)
尾声
基本上到这里就差不多了。断断续续修了几个bug改了一些东西又花了一些时间,现在的客户端可以用来下载小型文件,大文件的话,还不确定能不能跑起来。
因为算是第一次接触网络编程还有用协程来写,所以感觉磕磕碰碰的。代码方面,倒是试图利用linq和一些基础的FP的概念来让代码可读性变得更好一点。不过感觉最重要的还是对网络还有协议的理解了。除此之外,Kotlin/Java的字符串的实现和C/C++的很不一样,用起来坑也很多,尤其是对于Bencode这种把二进制结构和字符串缝合在一起的数据结构。不夸张地说,这个项目里起码三分之一的时间都用来处理字符串相关的问题了。
实现的功能也就只有最基本的下载,不支持断点续传、多个文件下载以及做种,也没有pipeline。不过两周时间内磕磕碰碰写出一个能跑的客户端还是很有成就感的w
已知的问题
- 和特定的Peer连接的时候Handshake会失败(socket试图读68个字节但读了个EOF)
- 线程没吃满
- 建立TCP时候的Timeout太长了