字符串消息的序列化
在上一篇文章中,我们使用MQTTnet
框架,实现了一个MQTT服务器、MQTT发布者进程、MQTT订阅者进程。在消息传递过程中,我们将控制台的字符串直接传递。因为MQTT是应用层协议,它是基于TCP协议进行数据传输。我们 直到TCP本身是基于字节流的传输协议。所以我们的字符串最终会 序列化成自己数组进行数据传输。我们先来看下发布者发送消息的代码:
string msg;
while (true)
{
Console.WriteLine("请输入需要发送的消息:");
msg = Console.ReadLine();
msg = "time:" + DateTime.Now.ToString("yyyy-MM-dd HH:MM:ss") + " msg:" + msg;
var applicationMessage = new MqttApplicationMessageBuilder()
.WithTopic("mytopic")
.WithPayload(msg)
.Build();
var result = await mqttClient.PublishAsync(applicationMessage, CancellationToken.None);
Console.WriteLine("成功向MQTT服务器发布消息.....");
}
可以看到WithPayload
函数我们直接设置了一个字符串,然后我们来看下MQTT是 如何实现WithPayload
这个函数的源代码的,下面代码可以和明显看到,该函数调用了Encoding.UTF8.GetBytes(payload)
方法将字符串转换为字节数组,然后再发送。Encoding.UTF8.GetBytes
是C#提供了的一个最简单的序列化函数。这种直接将字符串序列化为,这种序列化其实就是文本序列化。在实际的开发中,比如游戏开发,我们通常使用二进制序列化协议而不是文本序列化协议。
public MqttApplicationMessageBuilder WithPayload(string payload)
{
if (string.IsNullOrEmpty(payload))
{
return WithPayload(default(byte[]));
}
var payloadBuffer = Encoding.UTF8.GetBytes(payload);
return WithPayload(payloadBuffer);
}
文本序列化和二进制序列化
文本序列化是将对象转换为人类可读的文本格式。常见的文本序列化格式包括JSON、XML和YAML。文本序列化的有点和缺点如下:
优点
1. 可读性:序列化后的数据是人类可读的,便于调试和日志记录。
2. 跨平台:大多数编程语言都支持解析和生成常见的文本格式,如JSON和XML。
3. 灵活性:文本格式可以很容易地进行修改和扩展。
缺点
1. 性能:文本序列化通常比二进制序列化慢,因为需要进行字符串解析和生成。
2. 空间效率:文本格式通常比二进制格式占用更多的存储空间。
二进制序列化是将对象转换为紧凑的二进制格式。常见的二进制序列化格式包括Protocol Buffers、MessagePack和Avro。二进制序列化的优点和缺点如下:
优点
1. 性能:二进制序列化通常比文本序列化快,因为不需要进行字符串解析。
2. 空间效率:二进制格式通常比文本格式占用更少的存储空间。
缺点
1. 可读性:序列化后的数据是二进制的,不易于人类阅读和调试。
2. 跨平台:虽然许多二进制格式是跨平台的,但并不是所有编程语言都支持所有二进制格式。
最终的结论就是:二进制序列化,效率高,速度快,体积小,传输效率高。在实际的 业务开发中,我们基本上都是使用二进制序列化这个方案。当然在本篇文章中,我们 将使用MessagePack
这个类库来将对象序列化成二进制字节数组。
MessagePack 是一种高效的二进制序列化格式,和JSON序列化不同的是,MessagePack更快且更小。它由日本工程师 Sadayuki Furuhashi 创建,并在 GitHub 上作为开源项目进行维护。MessagePack 适用于需要高性能和高空间效率的场景,如网络通信、存储和传输大量数据。github开源地址为:https://github.com/MessagePack-CSharp/MessagePack-CSharp。该项目由6.1k的star,是非常优秀的一个二进制序列化开源项目。
接下来,我们就一步步演示如何使用MessagePack
这个类库。
定义消息格式
和我们普通的http请求一样,我们可能会封装一个http报文体的通用格式,里面包含三个字段,code,data,message。当然在这里,我们使用同样的Model来承载消息内容。这是一个最简单的C#类,我们不在做详细介绍,大家一看便知。
public class Message<T>
{
public int Code { get; set; }
public string Msg { get; set; }
public T Data { get; set; }
}
安装MessagePack包
要想使用MessagePack
框架,我们必须首先安装nuget包:MessagePack。安装命令行如下:
dotnet add package MessagePack
对于承载消息体的对象,我们必须对该类添加MessagePackObject
特性,对类中的每一个字段添加Key
特性,并且标记好每个字段的序列,一般从0开始,序列依次递增就可以。修改完的消息类如下:
[MessagePackObject]
public class Message<T>
{
[Key(0)]
public int Code { get; set; }
[Key(1)]
public string Msg { get; set; }
[Key(2)]
public T Data { get; set; }
}
消息序列化
MessagePack
类库中有一个最基础的类MessagePackSerializer
,它提供了两个最基本的函数:Serialize
和Deserialize
分别负责将对象序列化为二进制以及将二进制反序列化为对象。我们使用构建者模式来构建我们最终传输的消息对象。
public class MessageBuilder<T>
{
private int _code;
private string _msg;
private T _data;
public static MessageBuilder<T> Create<T>()
{
return new MessageBuilder<T>();
}
public MessageBuilder<T> WithCode(int code)
{
_code = code;
return this;
}
public MessageBuilder<T> WithMsg(string msg)
{
_msg = msg;
return this;
}
public MessageBuilder<T> WithData(T data)
{
_data = data;
return this;
}
public byte[] Build()
{
var message= new Message<T>
{
Code = _code,
Msg = _msg,
Data = _data
};
byte[] bytes = MessagePackSerializer.Serialize(message);
return bytes;
}
}
消息的反序列化
直接使用最基本的Deserialize
函数对消息进行反序列化。
public class MessageParser
{
public static Message<T> Parse<T>(byte[] bytes)
{
var message = MessagePackSerializer.Deserialize<Message<T>>(bytes);
return message;
}
}
修改MQTT发布者程序
有了上面的消息序列化和反序列化函数之后,我们就可以直接调用。替换MQTT 发布者程序中的消息序列化过程,这里只挑选核心的部分代码展示:
string data;
while (true)
{
Console.Write("请输入需要发送的消息:");
data = Console.ReadLine();
data = "time:" + DateTime.Now.ToString("yyyy-MM-dd HH:MM:ss") + " msg:" + data;
byte[] bytes = MessageBuilder<string>.Create()
.WithCode(200)
.WithMsg("msg")
.WithData(data)
.Build();
var applicationMessage = new MqttApplicationMessageBuilder()
.WithTopic("mytopic")
.WithPayload(bytes)
.Build();
var result = await mqttClient.PublishAsync(applicationMessage, CancellationToken.None);
}
修改MQTT订阅者程序
对于订阅者程序,需要修改接收到消息的回调函数。
var mqttClientOptions = new MqttClientOptionsBuilder().WithTcpServer("127.0.0.1").Build();
mqttClient.ApplicationMessageReceivedAsync += e =>
{
Message<string> message = MessageParser.Parse<string>(e.ApplicationMessage.Payload.First.Span.ToArray());
Console.WriteLine($"接收到消息:clientid:{e.ClientId},topic:{e.ApplicationMessage.Topic},message:{message.Data}");
return Task.CompletedTask;
};
运行结果
接下来,我们依次启动MQTT服务器进程、订阅者进程和发布者进程,然后看运行结果。
首先是服务器进程运行结果:

其次是订阅者:

最后是发布者,我们用发布者发布三条消息,并且订阅者成功收到了消息。
