共计 5680 个字符,预计需要花费 15 分钟才能阅读完成。
在业务变得越来越复杂时,我们的架构也会随之演变。在我们架构演变成 TCP 服务的时候,我们就需要定义客户端和服务端的消息格式。这篇文章我们将使用 Go 语言作为服务端语言与其它语言通信。在此之前你需要稍微了解 TCP 协议。
- 介绍
- 自定义消息格式
- 服务端实现
- 客户端实现
- 完整代码
- 服务端
- 客户端
- 其它语言客户端
- PHP
- Java
介绍
在 Go 里面,我们使用创建一个 TCP
连接非常简单,只需要导入 net 即可。处理 io
也非常简单只需要导入 io 库即可. 就像下面的代码一样轻松实现一个tcp 服务
。
ln, err := net.Listen("tcp", ":9527")
if err != nil {
// handle error
log.Fatal(err)
}
for {conn, err := ln.Accept()
if err != nil {
// handle error
log.Fatal(err)
}
go function(conn net.Conn) {read := make([]byte, 4)
c.Read(read)
conn.Close()}(conn)
}
自定义消息格式
自定义消息格式就是服务端和客户端约定好消息的格式,比如消息头是多少字节(TCP 基本都是使用二进制数据),消息长度多少字节,消息内容两边按照约定的规则去读取解析数据。
包头[字节] + 包内容长度[字节] + 包体[字节]
比如我先将包头和包长度读取出来,解析之后一次性读出包体.
使用自定义消息格式,我们很容易判断出客户端发过来的内容是否合法,扩展性也很强,只需要这个格式实现服务就行了,也节省的带宽消耗(相对来说).
服务端实现
上面也说过实现 Go 的 TCP 服务很简单,只需要几行代码就 OK,我们在上面代码的基础加上数据的读取解析和写入。
我们需要定义一个协议结构体,拆包 / 解包函数.
- 协议结构体
type protocol struct {
Length uint32 // 内容长度
Content []byte // 内容
}
- 解包
// 解包,先读取 4 个字节转换成整形,再读包长度字节
func UnPacket(c net.Conn) (*Protocol, error) {
var (
p = &Protocol{}
header = make([]byte, HEADER_LEN)
)
_, err := io.ReadFull(c, header)
if err != nil {
return p, err
}
p.Length = binary.BigEndian.Uint32(header) // 转换成 10 进制的数字
contentByte :=make([]byte, p.Length)
_, e := io.ReadFull(c, contentByte) // 读取内容
if e != nil {
return p, e
}
p.Content = contentByte
return p, nil
}
实现了解包,我们就可以处理 Content
内容,可以约定为 string
, json
这里我们约定为 json
. 我们将服务协议 ID 定义到content
里面.
- 解析内容约定好的 json 格式:
{
"serviceId": "", // 协议名称, 比如调用,
"data": {} // 参数
}func (p *Protocol) parseContent() (map[string]interface{}, error) {
var object map[string]interface{}
unmarshal := json.Unmarshal(p.Content, &object)
if unmarshal != nil {
return object, unmarshal
}
return object, nil
}
客户端实现
上面的代码已经完成了服务端解包,解析内容。我们现在实现客户端, 让服务端打印客户端的内容将内容再发送给客户端. 我们需要一个组合包的方法和控制台提取输入内容的实现.
- 生成包体
func Packet(serviceId string, content string) []byte {
bytes, _ := json.Marshal(Content{ServiceId:serviceId, Data:content})
buffer := make([]byte, HEADER_LEN + len(bytes))
// 将 buffer 前面四个字节设置为包长度,大端序
binary.BigEndian.PutUint32(buffer[0:4], uint32(len(bytes)))
copy(buffer[4:], bytes)
return buffer
}
完整代码
服务端
package main
import (
"encoding/binary"
"encoding/json"
"fmt"
"io"
//"io"
"log"
"net"
)
type Protocol struct {
Length uint32
Content []byte}
const HEADER_LEN = 4
func Packet(content string) []byte {buffer := make([]byte, HEADER_LEN + len(content))
// 将 buffer 前面四个字节设置为包长度,大端序
binary.BigEndian.PutUint32(buffer[0:4], uint32(len(content)))
copy(buffer[4:], content)
return buffer
}
// 解包,先读取 4 个字节转换成整形,再读包长度字节
func UnPacket(c net.Conn) (*Protocol, error) {
var (p = &Protocol{}
header = make([]byte, HEADER_LEN)
)
_, err := io.ReadFull(c, header)
if err != nil {return p, err}
p.Length = binary.BigEndian.Uint32(header) // 转换成 10 进制的数字
contentByte :=make([]byte, p.Length)
_, e := io.ReadFull(c, contentByte) // 读取内容
if e != nil {return p, e}
p.Content = contentByte
return p, nil
}
func (p *Protocol) parseContent() (map[string]interface{}, error) {var object map[string]interface{}
unmarshal := json.Unmarshal(p.Content, &object)
if unmarshal != nil {return object, unmarshal}
return object, nil
}
func main() {l, err := net.Listen("tcp", ":9527")
if err != nil {log.Fatal(err)
}
defer l.Close()
for {
// Wait for a connection.
conn, err := l.Accept()
if err != nil {log.Fatal(err)
}
// Handle the connection in a new goroutine.
// The loop then returns to accepting, so that
// multiple connections may be served concurrently.
go func(c net.Conn) {protocol, _ := UnPacket(c)
parseContent, err := protocol.parseContent()
if (err != nil) { }
s := parseContent["serviceId"].(string)
cstr := parseContent["data"].(string)
if s == "Hello.world" {fmt.Printf("serviceId: %s, content: %s", s, cstr)
writeByte := []byte(cstr)
c.Write(writeByte);
}
c.Close()}(conn)
}
}
客户端
package main
import (
"bufio"
"encoding/binary"
"encoding/json"
"fmt"
"log"
"net"
"os"
)
const HEADER_LEN = 4
type Content struct {
ServiceId string `json:"serviceId"`
Data interface{} `json:"data"`}
func Packet(serviceId string, content string) []byte {bytes, _ := json.Marshal(Content{ServiceId:serviceId, Data:content})
buffer := make([]byte, HEADER_LEN + len(bytes))
// 将 buffer 前面四个字节设置为包长度,大端序
binary.BigEndian.PutUint32(buffer[0:4], uint32(len(bytes)))
copy(buffer[4:], bytes)
return buffer
}
func main() {conn, e := net.Dial("tcp", ":9527")
if e != nil {log.Fatal(e)
}
reader := bufio.NewReader(os.Stdin)
fmt.Print("Text to send:")
text, _ := reader.ReadString('\n')
//buffer := new(bytes.Buffer)
buffer := Packet("Hello.world", text)
conn.Write(buffer)
// listen for reply
message, _ := bufio.NewReader(conn).ReadString('\n')
fmt.Print("Message from server:" + message)
defer conn.Close()}
各位可以建两个文件测试下输出内容, 下面是我测试的结果:
- 客户端
Text to send: 我是骑驴找蚂蚁
Message from server: 我是骑驴找蚂蚁 - 服务端
serviceId: Hello.world, content: 我是骑驴找蚂蚁
其它语言客户端
我将使用一些其它语言实现的客户端来发送消息。在一些大型项目中多语言交互是很正常的事情。
PHP
<?php
$sock = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
socket_connect($sock, '127.0.0.1', 9527);
$content = json_encode(["serviceId" => "Hello.world",
"data" => "我是 php 发的消息"
], JSON_UNESCAPED_UNICODE);
$binContent = pack("N", strlen($content)) . $content;
socket_write($sock, $binContent);
echo socket_read($sock, 1024);
socket_close($sock);
?>
# 客户端
[root@localhost]
php socketClient.php
服务端返回: 我是 php 发的消息
# 服务端
serviceId: Hello.world, content: 我是 php 发的消息
Java
package com.loocode;
import java.io.*;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
/**
* Hello world!
*
*/
public class Socket {public static void main( String[] args ) throws IOException {java.net.Socket socket = new java.net.Socket();
InetSocketAddress socketAddress = new InetSocketAddress("127.0.0.1", 9527);
socket.connect(socketAddress);
if (socket.isConnected()) {System.out.println( "连接服务成功!");
}
OutputStream outputStream = new BufferedOutputStream(socket.getOutputStream());
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
BufferedReader stdIn = new BufferedReader(new InputStreamReader(System.in));
String userInput = "{\"serviceId\": \"Hello.world\", \"data\": \"" + stdIn.readLine() + "\"}";
int len = userInput.getBytes().length;
byte[] headerSize = ByteBuffer.allocate(4).putInt(len).array();// 转换成字节
outputStream.write(headerSize);
outputStream.write(userInput.getBytes());
outputStream.flush();
System.out.println("echo:" + in.readLine());
socket.close();}
}
# 客户端
连接服务成功!
我是 java 发过来的消息
echo: 我是 java 发过来的消息
#服务端
serviceId: Hello.world, content: 我是 java 发过来的消息