概述
包 gopacket 为 Go 语言提供数据包解码功能。
gopacket 包含多个带有额外功能的子包,包括:
layers:每次都可能使用该子包。它包含内置于 gopacket 的用于解码数据包协议的逻辑。注意,下面的所有示例代码假定已经导入 gopacket 和 gopacket/layers。
pcap:使用 libpcap 从网络读取数据包的 C 绑定。
pfring:使用 PF_RING 从网络读取数据包的 C 绑定。
afpacket:从网络上读取数据包的 Linux AF_PACKET 的 C 绑定。
tcpassembly:TCP 流重组。
此外,如果打算直接编写代码,那么请查看 examples (https://github.com/google/gopacket/tree/master/examples)子目录,其中包含许多使用 gopacket 库构建的简单二进制示例。
由于 x/sys/unix 依赖,pcapgo/EthernetHandle、afpacket 和 bsdbpf 至少需要 Go 1.7。除此之外,所需的最小 Go 版本是 1.5。
基本应用
gopacket 以 []byte 的形式接收数据包数据,并且将其解码为具有非零“层”数的数据包。每层对应于字节中的一个协议。解码数据包后,可以从数据包中请求数据包的层。
// Decode a packet
packet := gopacket.NewPacket(myPacketData, layers.LayerTypeEthernet, gopacket.Default)
// Get the TCP layer from this packet
if tcpLayer := packet.Layer(layers.LayerTypeTCP); tcpLayer != nil {
fmt.Println("This is a TCP packet!")
// Get actual TCP data from this layer
tcp, _ := tcpLayer.(*layers.TCP)
fmt.Printf("From src port %d to dst port %d\n", tcp.SrcPort, tcp.DstPort)
}
// Iterate over all layers, printing out each layer type
for _, layer := range packet.Layers() {
fmt.Println("PACKET LAYER:", layer.LayerType())
}
// Decode an ethernet packet
ethP := gopacket.NewPacket(p1, layers.LayerTypeEthernet, gopacket.Default)
// Decode an IPv6 header and everything it contains
ipP := gopacket.NewPacket(p2, layers.LayerTypeIPv6, gopacket.Default)
// Decode a TCP header and its payload
tcpP := gopacket.NewPacket(p3, layers.LayerTypeTCP, gopacket.Default)
从源读取数据包
一旦拥有 PacketSource,可以以多种方式从其中读取数据包。请查看 PacketSource 的文档,了解更多细节。最简单的方式是 Packets 函数,该函数返回一个 Channel,然后异步地将数据包写进该 Channel,如果 packetSource 达到文件结束(end-of-file),则关闭 Channel。
packetSource := ... // construct using pcap or pfring
for packet := range packetSource.Packets() {
handlePacket(packet) // do something with each packet
}
可以通过设置 packetSource.DecodeOptions 中的字段更改 packetSource 的解码选项...查看下面的部分,了解更多细节。
惰性解码
// Create a packet, but don't actually decode anything yet
packet := gopacket.NewPacket(myPacketData, layers.LayerTypeEthernet, gopacket.Lazy)
// Now, decode the packet up to the first IPv4 layer found but no further.
// If no IPv4 layer was found, the whole packet will be decoded looking for
// it.
ip4 := packet.Layer(layers.LayerTypeIPv4)
// Decode all layers and return them. The layers up to the first IPv4 layer
// are already decoded, and will not require decoding a second time.
layers := packet.Layers()
无拷贝解码
默认情况下,gopacket 将拷贝传递给 NewPacket 的切片,在数据包内存储该拷贝。因此对切片下层的字节的修改不会影响数据包及其层。如果可以保证不更改底层切片字节,那么使用 NoCopy 告诉 gopacket.NewPacket,它将使用被传入的切片本身。
// This channel returns new byte slices, each of which points to a new
// memory location that's guaranteed immutable for the duration of the
// packet.
for data := range myByteSliceChannel {
p := gopacket.NewPacket(data, layers.LayerTypeEthernet, gopacket.NoCopy)
doSomethingWithPacket(p)
}
已知层的指针
// Get packets from some source
for packet := range someSource {
if app := packet.ApplicationLayer(); app != nil {
if strings.Contains(string(app.Payload()), "magic string") {
fmt.Println("Found magic string in a packet!")
}
}
}
packet := gopacket.NewPacket(myPacketData, layers.LayerTypeEthernet, gopacket.Default)
if err := packet.ErrorLayer(); err != nil {
fmt.Println("Error decoding some part of the packet:", err)
}
Flow 和 Endpoint
Endpoint 是源或目的地的可哈希表示。比如,对于 LayerTypeIPv4,Endpoint 包含 v4 IP 数据包的 IP 地址字节。Flow 可以分解为 Endpoint,Endpoint 可以组合成 Flow:
packet := gopacket.NewPacket(myPacketData, layers.LayerTypeEthernet, gopacket.Lazy)
netFlow := packet.NetworkLayer().NetworkFlow()
src, dst := netFlow.Endpoints()
reverseFlow := gopacket.NewFlow(dst, src)
flows := map[gopacket.Endpoint]chan gopacket.Packet
packet := gopacket.NewPacket(myPacketData, layers.LayerTypeEthernet, gopacket.Lazy)
// Send all TCP packets to channels based on their destination port.
if tcp := packet.Layer(layers.LayerTypeTCP); tcp != nil {
flows[tcp.TransportFlow().Dst()] <- packet
}
// Look for all packets with the same source and destination network address
if net := packet.NetworkLayer(); net != nil {
src, dst := net.NetworkFlow().Endpoints()
if src == dst {
fmt.Println("Fishy packet has same network source and dst: %s", src)
}
}
// Find all packets coming from UDP port 1000 to UDP port 500
interestingFlow := gopacket.FlowFromEndpoints(layers.NewUDPPortEndpoint(1000), layers.NewUDPPortEndpoint(500))
if t := packet.NetworkLayer(); t != nil && t.TransportFlow() == interestingFlow {
fmt.Println("Found that UDP flow I was looking for!")
}
出于负载均衡的目的,Flow 和 Endpoint 都拥有 FastHash() 函数,该函数提供其内容的快速、非加密散列。特别重要的是 Flow FastHash() 是对称的:A -> B 与 B -> A 具有相同的哈希值。示例用法如下:
channels := [8]chan gopacket.Packet
for i := 0; i < 8; i++ {
channels[i] = make(chan gopacket.Packet)
go packetHandler(channels[i])
}
for packet := range getPackets() {
if net := packet.NetworkLayer(); net != nil {
channels[int(net.NetworkFlow().FastHash()) & 0x7] <- packet
}
}
实现自己的解码器
如果你的网络有一些奇怪的封装,那么可以实现自己的解码器。在本例中,我么处理用 4 字节头封装的 Ethernet 数据包。
// Create a layer type, should be unique and high, so it doesn't conflict,
// giving it a name and a decoder to use.
var MyLayerType = gopacket.RegisterLayerType(12345, gopacket.LayerTypeMetadata{Name: "MyLayerType", Decoder: gopacket.DecodeFunc(decodeMyLayer)})
// Implement my layer
type MyLayer struct {
StrangeHeader []byte
payload []byte
}
func (m MyLayer) LayerType() gopacket.LayerType { return MyLayerType }
func (m MyLayer) LayerContents() []byte { return m.StrangeHeader }
func (m MyLayer) LayerPayload() []byte { return m.payload }
// Now implement a decoder... this one strips off the first 4 bytes of the
// packet.
func decodeMyLayer(data []byte, p gopacket.PacketBuilder) error {
// Create my layer
p.AddLayer(&MyLayer{data[:4], data[4:]})
// Determine how to handle the rest of the packet
return p.NextDecoder(layers.LayerTypeEthernet)
}
// Finally, decode your packets:
p := gopacket.NewPacket(data, MyLayerType, gopacket.Lazy)
使用 DecodingLayerParser 快速解码
TLDR(Too Long,Didn't Read):DecodingLayerParser 解码数据包数据花费的时间大约是 NewPacket 的 10%,但仅适用于已知的数据包堆栈。
DecodingLayerParser 通过直接将数据包层解码进预分配对象的方式,完全地避免内存分配,然后可以引用这些对象,获取数据包的信息。示例如下:
func main() {
var eth layers.Ethernet
var ip4 layers.IPv4
var ip6 layers.IPv6
var tcp layers.TCP
parser := gopacket.NewDecodingLayerParser(layers.LayerTypeEthernet, ð, &ip4, &ip6, &tcp)
decoded := []gopacket.LayerType{}
for packetData := range somehowGetPacketData() {
if err := parser.DecodeLayers(packetData, &decoded); err != nil {
fmt.Fprintf(os.Stderr, "Could not decode layers: %v\n", err)
continue
}
for _, layerType := range decoded {
switch layerType {
case layers.LayerTypeIPv6:
fmt.Println(" IP6 ", ip6.SrcIP, ip6.DstIP)
case layers.LayerTypeIPv4:
fmt.Println(" IP4 ", ip4.SrcIP, ip4.DstIP)
}
}
}
}
这里需要注意的重要事项是,解析器修改传入的层(eth、ip4、ip6、tcp),而非分配新层,因此极大地加快了解码过程。它甚至基于层类型进行分支...它将处理 (eth, ip4, tcp) 或 (eth, ip6, tcp) 堆栈。但它不处理任何其它类型...由于没有传入其它解码器, (eth, ip4, udp) 堆栈将在 ip4 后,停止解码,并且只通过 “decoded” 切片传回 [LayerTypeEthernet, LayerTypeIPv4](以及说明无法解码 UDP 数据包的错误)。
使用 DecodingLayerContainer 实现更快的和自定义解码
默认情况下,DecodingLayerParser 使用原生 Map 存储和搜索要解码的层。虽然是通用的,但在某些情况下,该方案可能不是最优的。比如,如果只有几层,那么通过稀疏数组索引或线性数组扫描可能提供更快的操作。
为适应这些场景,引入了 DecodingLayerContainer 接口及其实现:DecodingLayerSparse、DecodingLayerArray 和 DecodingLayerMap。可以使用 SetDecodingLayerContainer 方法指定 DecodingLayerParser 的容器实现。示例:
dlp := gopacket.NewDecodingLayerParser(LayerTypeEthernet)
dlp.SetDecodingLayerContainer(gopacket.DecodingLayerSparse(nil))
var eth layers.Ethernet
dlp.AddDecodingLayer(ð)
// ... add layers and use DecodingLayerParser as usual...
如果想跳过间接层(尽管牺牲一些功能),那么可以使用 DecodingLayerContainer 作为解码工具。在这种情况下,必须自己处理未知的层类型和 Panic。示例:
func main() {
var eth layers.Ethernet
var ip4 layers.IPv4
var ip6 layers.IPv6
var tcp layers.TCP
dlc := gopacket.DecodingLayerContainer(gopacket.DecodingLayerArray(nil))
dlc = dlc.Put(ð)
dlc = dlc.Put(&ip4)
dlc = dlc.Put(&ip6)
dlc = dlc.Put(&tcp)
// you may specify some meaningful DecodeFeedback
decoder := dlc.LayersDecoder(LayerTypeEthernet, gopacket.NilDecodeFeedback)
decoded := make([]gopacket.LayerType, 0, 20)
for packetData := range somehowGetPacketData() {
lt, err := decoder(packetData, &decoded)
if err != nil {
fmt.Fprintf(os.Stderr, "Could not decode layers: %v\n", err)
continue
}
if lt != gopacket.LayerTypeZero {
fmt.Fprintf(os.Stderr, "unknown layer type: %v\n", lt)
continue
}
for _, layerType := range decoded {
// examine decoded layertypes just as already shown above
}
}
}
DecodingLayerSparse 是最快的,但在使用的层可以解码的 LayerType 值不大时最有效,否则可能导致更大的内存占用。DecodingLayerArray 非常紧凑,主要用于解码层数不多的情况(最多约 10-15 层,但请自行进行基准测试)。DecodingLayerMap 是最通用的,默认情况下,DecodingLayerParser 使用 DecodingLayerMap。请参考层子包中的测试和基准测试,以进一步了解使用示例和性能度量。
如果希望使用自己内部的数据包解码逻辑,那么可以选择实现自己的 DecodingLayerContainer。
创建数据包数据
ip := &layers.IPv4{
SrcIP: net.IP{1, 2, 3, 4},
DstIP: net.IP{5, 6, 7, 8},
// etc...
}
buf := gopacket.NewSerializeBuffer()
opts := gopacket.SerializeOptions{} // See SerializeOptions for more details.
err := ip.SerializeTo(buf, opts)
if err != nil { panic(err) }
fmt.Println(buf.Bytes()) // prints out a byte slice containing the serialized IPv4 layer.
buf := gopacket.NewSerializeBuffer()
opts := gopacket.SerializeOptions{}
gopacket.SerializeLayers(buf, opts,
&layers.Ethernet{},
&layers.IPv4{},
&layers.TCP{},
gopacket.Payload([]byte{1, 2, 3, 4}))
packetData := buf.Bytes()
最后说明
如果使用 gopacket,那么几乎肯定希望确保已导入 gopacket/layers,因为导入时,它将设置所有 LayerType 变量,填充许多令人关注的变量/映射(DecodersByLayerName 等)。因此,建议即便不直接使用任何层函数,仍然使用下面的代码导入它:
import (
_ "github.com/google/gopacket/layers"
)
TCP 流重组
环境说明
操作系统:Ubuntu 20.10
$ go env -w GOPROXY=https://goproxy.cn,direct
安装 libpcap
$ tar zxvf libpcap-1.10.4.tar.gz
$ cd libpcap-1.10.4/
$ sudo apt install -y gcc flex bison make
$ ./configure
$ make
$ sudo make install
$ sudo ldconfig
# 查看
$ sudo ldconfig -p | grep -i pcap
TCP 流重组示例
$ mkdir gopacket-tcp-reassembly-test
$ cd gopacket-tcp-reassembly-test/
$ go mod init gopacket-tcp-reassembly-test
$ go get -u github.com/google/[email protected]
在项目目录下,创建 main.go:
package main
import (
"bufio"
"bytes"
"flag"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcap"
"github.com/google/gopacket/tcpassembly"
"github.com/google/gopacket/tcpassembly/tcpreader"
"io"
"log"
"net/http"
"sync"
"time"
)
// Cache 用于检索响应对应的请求。这样做的原因包括:
// 1. 解析 HTTP 响应时,需要相应的请求对象
// 2. 将响应和请求配对
type Cache struct {
mtx sync.RWMutex
// 格式为:{网络层哈希值: {传输层哈希值: 请求对象, ...}, ...}
m map[uint64]map[uint64]*http.Request
// 用于判断 Flow 是请求还是响应
requests map[string]struct{}
}
func NewCache() *Cache {
return &Cache{
m: make(map[uint64]map[uint64]*http.Request),
requests: make(map[string]struct{}),
}
}
// Get 用于在解析响应前,从 Cache 中获取请求
func (c *Cache) Get(netHash, transportHash uint64) *http.Request {
c.mtx.RLock()
defer c.mtx.RUnlock()
if transportMap, found := c.m[netHash]; found {
if r, found := transportMap[transportHash]; found {
return r
}
}
return nil
}
// Set 用于在解析完请求后,将其保存到 Cache 中
func (c *Cache) Set(net, transport gopacket.Flow, r *http.Request) {
c.mtx.Lock()
defer c.mtx.Unlock()
transportMap, found := c.m[net.FastHash()]
if found {
transportMap[transport.FastHash()] = r
} else {
c.m[net.FastHash()] = map[uint64]*http.Request{transport.FastHash(): r}
}
c.requests[net.Src().String()+":"+transport.Src().String()] = struct{}{}
}
// Delete 用于从 Cache 中删除条目
func (c *Cache) Delete(net, transport gopacket.Flow) {
c.mtx.Lock()
defer c.mtx.Unlock()
// 如果是请求,那么删除 requests 中的条目
key := net.Src().String() + ":" + transport.Src().String()
if _, found := c.requests[key]; found {
delete(c.requests, key)
return
}
// 如果是响应,那么删除缓存的请求对象
transportMap, found := c.m[net.FastHash()]
if !found {
return
}
delete(transportMap, transport.FastHash())
if len(transportMap) == 0 {
delete(c.m, net.FastHash())
}
}
var cache = NewCache()
// httpStream 真正地处理 HTTP 请求的解码
type httpStream struct {
net, transport gopacket.Flow
r tcpreader.ReaderStream
}
func (h *httpStream) run() {
buf := bufio.NewReader(&h.r)
for {
var err error
// Magic Number 用于确定协议。比如 HTTP 请求报文、HTTP 响应报文
var magicNumber []byte
var resp *http.Response
var req *http.Request
isResponse := false
// PEEK(返回下 N 个字节,但不推进 Reader)出前 2 个字节,以确定 Magic Number
magicNumber, err = buf.Peek(2)
if err != nil {
// 如果遇到其它错误,那么先打印错误信息,再返回
if err != io.EOF {
log.Printf("Error peeking magic number, %s\n", err)
} else {
// 如果遇到 EOF,那么清理缓存
cache.Delete(h.net, h.transport)
}
return
}
// 如果前 2 个字节是 HT,那么认为该 Flow 是 HTTP 响应
if bytes.Equal(magicNumber, []byte{'H', 'T'}) {
isResponse = true
}
if isResponse {
resp, err = http.ReadResponse(buf, cache.Get(h.net.FastHash(), h.transport.FastHash()))
} else {
if req, err = http.ReadRequest(buf); err == nil {
cache.Set(h.net, h.transport, req)
}
}
if err == io.EOF {
// 必须读到 EOF...非常重要!
cache.Delete(h.net, h.transport)
return
} else if err != nil {
log.Printf("Error reading stream, %s, %s, %s", h.net, h.transport, err)
continue
}
if req != nil {
body, _ := io.ReadAll(req.Body)
_ = req.Body.Close()
log.Printf("http request: %#v\n", *req)
log.Println(string(body))
} else {
body, _ := io.ReadAll(resp.Body)
// 读完 resp.Body 时,必须调用 resp.Body.Close
_ = resp.Body.Close()
log.Printf("http response: %#v\n", *resp)
log.Println(string(body))
}
}
}
// 使用 tcpassembly.StreamFactory 和 tcpassembly.Stream 接口,构建 HTTP 请求解析器。
// httpStreamFactory 实现 tcpassembly.StreamFactory
type httpStreamFactory struct{}
func (h *httpStreamFactory) New(net, transport gopacket.Flow) tcpassembly.Stream {
hs := &httpStream{
net: net,
transport: transport,
r: tcpreader.NewReaderStream(),
}
// 重要...必须保证读取 reader 流的数据
go hs.run()
// ReaderStream 实现 tcpassembly.Stream, 因此可以返回指向它的指针
return &hs.r
}
func main() {
// 解析命令行参数
var iface = flag.String("i", "eth0", "Interface to get packets from")
var snaplen = flag.Int("s", 1600, "SnapLen for pcap packet capture")
flag.Parse()
var handle *pcap.Handle
var err error
// pcap.OpenLive 打开设备,返回 *pcap.Handle。
// 它接受设备名称(eth0),每个数据包的最大大小(snaplen),是否将接口设置为混杂模式(即是否接收目的地址不为本机的包),以及超时作为参数。
// 警告:该函数仅接受毫秒时间戳。对于纳秒解决方案,使用 pcap.InactiveHandle。
// 如果将 timeout 设置为 30s,那么每 30s 刷新一次数据包;设置成负数,将立刻刷新数据包
handle, err = pcap.OpenLive(*iface, int32(*snaplen), true, time.Second)
if err != nil {
log.Panic(err)
}
// 务必释放 Handle
defer handle.Close()
// 设置 TCP 流重组
// 1. 创建 httpStreamFactory 结构体,实现 tcpassembly.StreamFactory 接口
streamFactory := &httpStreamFactory{}
// 2. 创建连接池
streamPool := tcpassembly.NewStreamPool(streamFactory)
// 3. 创建重组器
assembler := tcpassembly.NewAssembler(streamPool)
// 创建数据包源。
// handle.LinkType() 表示从 2 层以太网链路上抓取数据包
packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
// 从该 Channel 中,持续地读取数据包
packets := packetSource.Packets()
ticker := time.Tick(time.Minute)
for {
select {
case packet := <-packets:
if packet.NetworkLayer() == nil || packet.TransportLayer() == nil ||
packet.TransportLayer().LayerType() != layers.LayerTypeTCP {
log.Println("Unusable packet")
continue
}
tcp := packet.TransportLayer().(*layers.TCP)
// 4. 将数据包添加到重组器中
assembler.AssembleWithTimestamp(packet.NetworkLayer().NetworkFlow(), tcp, packet.Metadata().Timestamp)
case <-ticker:
// 每隔 1 分钟,刷新之前 2 分钟内不活跃的连接
assembler.FlushOlderThan(time.Now().Add(time.Minute * -2))
}
}
}
$ sudo apt install -y nginx
# 测试 Nginx 是否成功启动
$ curl http://127.0.0.1
# 在项目目录下执行
$ sudo go run main.go -i lo
$ curl http://127.0.0.1
其它示例
迭代当前机器上的所有网络接口
package main
import (
"fmt"
"github.com/google/gopacket/pcap"
"log"
)
func main() {
// 尝试迭代当前机器上的所有接口
devices, err := pcap.FindAllDevs()
if err != nil {
log.Panic(err)
}
// 打印设备信息
fmt.Println("Devices found:")
for _, device := range devices {
fmt.Println("\nName: ", device.Name)
fmt.Println("Description: ", device.Description)
fmt.Println("Devices addresses:")
for _, address := range device.Addresses {
fmt.Println("- IP address: ", address.IP)
fmt.Println("- Subnet mask: ", address.Netmask)
}
}
}
参考文档
1. https://pkg.go.dev/github.com/google/gopacket
关于Portal Lab
星阑科技 Portal Lab 致力于前沿安全技术研究及能力工具化。主要研究方向为API 安全、应用安全、攻防对抗等领域。实验室成员研究成果曾发表于BlackHat、HITB、BlueHat、KCon、XCon等国内外知名安全会议,并多次发布开源安全工具。未来,Portal Lab将继续以开放创新的态度积极投入各类安全技术研究,持续为安全社区及企业级客户提供高质量技术输出。