frp v0.1.0 源码浅析

有时间了去读读小项目的源代码感觉是很有意义的一件事情,就像去了解一些东西 how it works
这里选取一个比较有有意思的项目frp,是一个蛮好玩的反向代理的工具来实现内网穿透。

这里选取来 v0.1.0 的版本,因为功能简单以及代码量少把。

简介

整个系统由两个部分组成,frpc,frps ,分别是客户端和服务端。先从服务端开始分析。

服务端

函数入口

先从程序入口开始分析main()

func main() {
    err := client.LoadConf("./frpc.ini")
    if err != nil {
        os.Exit(-1)
    }

    log.InitLog(client.LogWay, client.LogFile, client.LogLevel)

    // wait until all control goroutine exit
    var wait sync.WaitGroup
    wait.Add(len(client.ProxyClients))

    for _, client := range client.ProxyClients {
        go ControlProcess(client, &wait)
    }

    log.Info("Start frpc success")

    wait.Wait()
    log.Warn("All proxy exit!")
}

:2对对配置进行解析,处理化 client,
日志模块是直接使用的 现有的库"github.com/astaxie/beego/logs"


这里的一句 var wait sync.WaitGroup 来创建一个 waitGroup的对象,来使得主线程阻塞来等待并发过程。Add(),Done(),Wait() add 来添加一个计数,done来减去一个计数,wair阻塞等待。

在这里使用 wait.Add(len(client.ProxyClients)) 根据已经按照配置的数量初始化的实例,来设置并发计数。


对client的list进行便利,并发开启服务线程。go 这里代表并发执行, 传入 client,以及wait的引用(在返回之后,可以进行done的操作)

for _, client := range client.ProxyClients {
    go ControlProcess(client, &wait)
}

ControlProcess

正如前面说到的逻辑,传入 wait,在后面会使用 done 来进行计数减。ControlProcess的原型如下:

func ControlProcess(cli *client.ProxyClient, wait *sync.WaitGroup) {
    defer wait.Done()

形参的类型很有意思,找到了C的感觉,
另外这里顺便给出了第一句,defer 用于在函数返回时处理资源等等问题。defer一次,相当于把后面的操作进行压栈,在return 之后,弹栈执行。


在创建proxy 第一步是登录验证loginToServer,函数原型如下:


func loginToServer(cli *client.ProxyClient) (c *conn.Conn, err error) {

传入连接实例,返回登录状态以及连接。使用 ConnectServer 进行tcp建连。构建连接请求体:

req := &msg.ClientCtlReq{
    Type:      consts.CtlConn,
    ProxyName: cli.Name,
    Passwd:    cli.Passwd,
}
// 这里是
type ClientCtlReq struct {
    Type      int64  <code>json:"type"</code>
    ProxyName string <code>json:"proxy_name"</code>
    Passwd    string <code>json:"passwd"</code>
}

正常的使用tcp来进行的连接操作,之后使用json来对回传的包来进行解析。来饭端回包,是否登录成功。如果成功的话go startHeartBeat(c) 来进行心跳定时。心跳服务的函数如下:

for {
    time.Sleep(time.Duration(client.HeartBeatInterval) * time.Second)
    if c != nil && !c.IsClosed() {
        err = c.Write(string(request) + "\n")
        if err != nil {
            log.Error("Send hearbeat to server failed! Err:%v", err)
            continue
        }
    } else {
        break
    }
}

去定时的发送 request作为系统的心跳包,如果出现写socket 失败的情况,中止心跳进程,因为有 defer heartBeatTimer.Stop() 所以,定时任务也会自动的停止


回到 control.go的第一层,会有一个 循环结构:

for {
    content, err := connection.ReadLine()

在循环体内部,先对可能出现的异常情况进行判断,最后调用代理实例的StartTunnel


Client

这里的 Client 就是上面的proxy 的实例,先贴出来 starttunnel 的函数内容:

func (p *ProxyClient) StartTunnel(serverAddr string, serverPort int64) (err error) {
    localConn, err := p.GetLocalConn()
    if err != nil {
        return
    }
    remoteConn, err := p.GetRemoteConn(serverAddr, serverPort)
    if err != nil {
        return
    }

    // l means local, r means remote
    log.Debug("Join two conns, (l[%s] r[%s]) (l[%s] r[%s])", localConn.GetLocalAddr(), localConn.GetRemoteAddr(),
        remoteConn.GetLocalAddr(), remoteConn.GetRemoteAddr())
    go conn.Join(localConn, remoteConn)
    return nil
}

先对配置中的的端口进行连接GetLocalConn 得到 client到本机的连接。之后进行远程连接,连接之后主动发包,说明进入工作状态:

    req := &msg.ClientCtlReq{
        Type:      consts.WorkConn,
        ProxyName: p.Name,
        Passwd:    p.Passwd,
    }

    buf, _ := json.Marshal(req)
    err = c.Write(string(buf) + "\n")
    if err != nil {
        log.Error("ProxyName [%s], write to server error, %v", p.Name, err)
        return
    }

在得到了本地的连接实例和远程的连接实例之后,进行隧道连接也就是最核心的隧道功能了,代码如下:

// will block until connection close
func Join(c1 *Conn, c2 *Conn) {
    var wait sync.WaitGroup
    pipe := func(to *Conn, from *Conn) {
        defer to.Close()
        defer from.Close()
        defer wait.Done()

        var err error
        _, err = io.Copy(to.TcpConn, from.TcpConn)
        if err != nil {
            log.Warn("join conns error, %v", err)
        }
    }

    wait.Add(2)
    go pipe(c1, c2)
    go pipe(c2, c1)
    wait.Wait()
    return
}

这里是使用了 wait,所以在两个 go 并发建立连接之后函数线程会发生阻塞。使用 io.copy 来对两个链接的内容进行正反的两次拷贝,实现连接的穿透。

可以直接使用 IO copy是我没有想到的,所以得去看看 Go 的 net.TCPConn 到底是什么东西。
在官网的手册上找到io.Copy 的原型:

//Copy copies from src to dst until either EOF is reached on src or an error occurs.
func Copy(dst Writer, src Reader) (written int64, err error)

所以这里的copy负责不断的进行连接之间的数据拷贝,直到收到 EOF

至此客户端的部分的源码分析完毕。

服务端

有了客户端的前提上,服务端就一样的清楚了。程序入口:

func main() {
    err := server.LoadConf("./frps.ini")
    if err != nil {
        os.Exit(-1)
    }

    log.InitLog(server.LogWay, server.LogFile, server.LogLevel)

    l, err := conn.Listen(server.BindAddr, server.BindPort)
    if err != nil {
        log.Error("Create listener error, %v", err)
        os.Exit(-1)
    }

    log.Info("Start frps success")
    ProcessControlConn(l)
}

在这里进行tcp的端口监听,Listen

go func() {
    for {
        conn, err := l.l.AcceptTCP()
        if err != nil {
            if l.closeFlag {
                return
            }
            continue
        }
        c := &Conn{
            TcpConn:   conn,
            closeFlag: false,
        }
        c.Reader = bufio.NewReader(c.TcpConn)
        l.conns <- c
    }
}()

以为只是一个监听的函数太在意,发现这里是建连的核心函数。在这里启动了一个并发,使用for循环来get到这个端口的tcp建连,并且创建连接实例使用 l.conns <- c 来把新建立的连接放在 channel内。

之后进入ProcessControlConn,其处理循环如下:

func ProcessControlConn(l *conn.Listener) {
    for {
        c, err := l.GetConn()
        if err != nil {
            return
        }
        log.Debug("Get one new conn, %v", c.GetRemoteAddr())
        go controlWorker(c)
    }
}
func (l *Listener) GetConn() (conn *Conn, err error) {
    var ok bool
    conn, ok = <-l.conns
    if !ok {
        return conn, fmt.Errorf("channel close")
    }
    return conn, nil
}

这里可以看的,getConn 的操作回去从 连接的chnnel(理解为队列)去拿已经建立的连接,如果没有连接则会阻塞。得到连接之后go controlWorker(c)


这里的worker就是和客户端建连之后的服务部分了,可能的功能有,登录,接受命令,打开端口,join连接。

worker中先进行一次 socket的读,和前面一样对json来进行解析,结构和上文一致:

    Type      int64  <code>json:"type"</code>
    ProxyName string <code>json:"proxy_name"</code>
    Passwd    string <code>json:"passwd"</code>

之后开始进行checkProxy来根据内容来看是否连接合法,包括 [连接是否存在,秘密是否正确,帧类型]

不知道 直接使用req.Passwd != s.Passwd 来进行密码比较会不会有漏洞


这里重点看一下帧类型的逻辑,因为他决定来这个请求我会做什么,这里分两部分看:

  • consts.CtlConn 建连控制,在服务端打开指定端口
  • consts.WorkConn 报文传输

如果得到的类型是 consts.WorkConn,最终执行p.cliConnChan <- c 把我们此次的数据存入队列。


如果报文对应的类型为 consts.WorkConn,先对连接状态进行判断,是否已经建连,如果没有的化执行s.Start()

    if req.Type == consts.CtlConn {
        if s.Status != consts.Idle {
            info = fmt.Sprintf("ProxyName [%s], already in use", req.ProxyName)
            log.Warn(info)
            return
        }
        // start proxy and listen for user conn, no block
        err := s.Start()
        if err != nil {
            info = fmt.Sprintf("ProxyName [%s], start proxy error: %v", req.ProxyName, err.Error())
            log.Warn(info)
            return
        }
        log.Info("ProxyName [%s], start proxy success", req.ProxyName)
    }

在这个start内,最终有两个线程,一个是来捕获服务端的外部连接的,另一个是用来连接这个外部链接和来自我们的客户端连接的,这里需要进行同步操作。代码如下:

    go func() {
        for {
            // block
            // if listener is closed, err returned
            c, err := p.listener.GetConn()
            if err != nil {
                log.Info("ProxyName [%s], listener is closed", p.Name)
                return
            }
            log.Debug("ProxyName [%s], get one new user conn [%s]", p.Name, c.GetRemoteAddr())

            // insert into list
            p.Lock()
            if p.Status != consts.Working {
                log.Debug("ProxyName [%s] is not working, new user conn close", p.Name)
                c.Close()
                p.Unlock()
                return
            }
            p.userConnList.PushBack(c)
            p.Unlock()

            // put msg to control conn
            p.ctlMsgChan <- 1

            // set timeout
            time.AfterFunc(time.Duration(UserConnTimeout)*time.Second, func() {
                p.Lock()
                defer p.Unlock()
                element := p.userConnList.Front()
                if element == nil {
                    return
                }

                userConn := element.Value.(*conn.Conn)
                if userConn == c {
                    log.Warn("ProxyName [%s], user conn [%s] timeout", p.Name, c.GetRemoteAddr())
                }
            })
        }
    }()

这一部分的代码首先收到了客户端的控制包,那么在本机(服务端)打开监听的端口,加锁之后,把连接存入 userConnList,之后对消息队列写1 p.ctlMsgChan <- 1
在一个超时时间,等待有没有客户端的连接回连,如果list就我自己的c连接,没有新的连接,那么说明没有新的连接加入,等待客户端超时。

    // start another goroutine for join two conns from client and user
    go func() {
        for {
            cliConn, ok := <-p.cliConnChan
            if !ok {
                return
            }

            p.Lock()
            element := p.userConnList.Front()

            var userConn *conn.Conn
            if element != nil {
                userConn = element.Value.(*conn.Conn)
                p.userConnList.Remove(element)
            } else {
                cliConn.Close()
                p.Unlock()
                continue
            }
            p.Unlock()

            // msg will transfer to another without modifying
            // l means local, r means remote
            log.Debug("Join two conns, (l[%s] r[%s]) (l[%s] r[%s])", cliConn.GetLocalAddr(), cliConn.GetRemoteAddr(),
                userConn.GetLocalAddr(), userConn.GetRemoteAddr())
            go conn.Join(cliConn, userConn)
        }
    }()

这一部分代码,首先从队列得到客户端发起的连接,判断是否有外来连接,如果没有,就关闭客户端连接,并继续等待。
如果有外部连接,得到外部链接,并且从连接队列里移除该连接。最终使用go conn.Join(cliConn, userConn) 来join 外部链接和客户端的连接。完成反向代理操作。至此服务端工作流程完毕。

总结

  • 客户端把自己的本地连接和远程连接进行join
  • 客户端发起两种连接请求,Ctrl 连接和 working
  • 服务器根据 ctrl 里的控制信息对本地的端口进行监听,并且回写 1 到ctrl连接表示成功建连
  • 服务端对listener的建连入队,之后分别对working 的连接进行join

Go还蛮有意思的,但是觉得异步的过分自由化,会不会导致流程上的问题。
defer的设计和chennel 的设计还是很好用的。return 的变量在定义时就被指定,提高来规范性。

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注