字符串消息的序列化

在上一篇文章中,我们使用MQTTnet 框架,实现了一个MQTT服务器、MQTT发布者进程、MQTT订阅者进程。在消息传递过程中,我们将控制台的字符串直接传递。因为MQTT是应用层协议,它是基于TCP协议进行数据传输。我们 直到TCP本身是基于字节流的传输协议。所以我们的字符串最终会 序列化成自己数组进行数据传输。我们先来看下发布者发送消息的代码:

string msg;
while (true)
{
    Console.WriteLine("请输入需要发送的消息:");
    msg = Console.ReadLine();
    msg = "time:" + DateTime.Now.ToString("yyyy-MM-dd HH:MM:ss") + " msg:" + msg;
    var applicationMessage = new MqttApplicationMessageBuilder()
        .WithTopic("mytopic")
        .WithPayload(msg)
        .Build();
    var result = await mqttClient.PublishAsync(applicationMessage, CancellationToken.None);
    Console.WriteLine("成功向MQTT服务器发布消息.....");
}

可以看到WithPayload函数我们直接设置了一个字符串,然后我们来看下MQTT是 如何实现WithPayload这个函数的源代码的,下面代码可以和明显看到,该函数调用了Encoding.UTF8.GetBytes(payload)方法将字符串转换为字节数组,然后再发送。Encoding.UTF8.GetBytes是C#提供了的一个最简单的序列化函数。这种直接将字符串序列化为,这种序列化其实就是文本序列化。在实际的开发中,比如游戏开发,我们通常使用二进制序列化协议而不是文本序列化协议。

public MqttApplicationMessageBuilder WithPayload(string payload)
{
    if (string.IsNullOrEmpty(payload))
    {
        return WithPayload(default(byte[]));
    }

    var payloadBuffer = Encoding.UTF8.GetBytes(payload);
    return WithPayload(payloadBuffer);
}

文本序列化和二进制序列化

文本序列化是将对象转换为人类可读的文本格式。常见的文本序列化格式包括JSON、XML和YAML。文本序列化的有点和缺点如下:

优点
1.    可读性:序列化后的数据是人类可读的,便于调试和日志记录。
2.    跨平台:大多数编程语言都支持解析和生成常见的文本格式,如JSON和XML。
3.    灵活性:文本格式可以很容易地进行修改和扩展。
缺点
1.    性能:文本序列化通常比二进制序列化慢,因为需要进行字符串解析和生成。
2.    空间效率:文本格式通常比二进制格式占用更多的存储空间。

二进制序列化是将对象转换为紧凑的二进制格式。常见的二进制序列化格式包括Protocol Buffers、MessagePack和Avro。二进制序列化的优点和缺点如下:

优点
1.    性能:二进制序列化通常比文本序列化快,因为不需要进行字符串解析。
2.    空间效率:二进制格式通常比文本格式占用更少的存储空间。
缺点
1.    可读性:序列化后的数据是二进制的,不易于人类阅读和调试。
2.    跨平台:虽然许多二进制格式是跨平台的,但并不是所有编程语言都支持所有二进制格式。

最终的结论就是:二进制序列化,效率高,速度快,体积小,传输效率高。在实际的 业务开发中,我们基本上都是使用二进制序列化这个方案。当然在本篇文章中,我们 将使用MessagePack这个类库来将对象序列化成二进制字节数组。

MessagePack 是一种高效的二进制序列化格式,和JSON序列化不同的是,MessagePack更快且更小。它由日本工程师 Sadayuki Furuhashi 创建,并在 GitHub 上作为开源项目进行维护。MessagePack 适用于需要高性能和高空间效率的场景,如网络通信、存储和传输大量数据。github开源地址为:https://github.com/MessagePack-CSharp/MessagePack-CSharp。该项目由6.1k的star,是非常优秀的一个二进制序列化开源项目。

接下来,我们就一步步演示如何使用MessagePack这个类库。

定义消息格式

和我们普通的http请求一样,我们可能会封装一个http报文体的通用格式,里面包含三个字段,code,data,message。当然在这里,我们使用同样的Model来承载消息内容。这是一个最简单的C#类,我们不在做详细介绍,大家一看便知。

public class Message<T>
{
    public int Code { get; set; }
    public string Msg { get; set; }
    public T Data { get; set; }
}

安装MessagePack包

要想使用MessagePack框架,我们必须首先安装nuget包:MessagePack。安装命令行如下:

dotnet add package MessagePack

对于承载消息体的对象,我们必须对该类添加MessagePackObject特性,对类中的每一个字段添加Key特性,并且标记好每个字段的序列,一般从0开始,序列依次递增就可以。修改完的消息类如下:

[MessagePackObject]
public class Message<T>
{
    [Key(0)]
    public int Code { get; set; }
    [Key(1)]
    public string Msg { get; set; }
    [Key(2)]
    public T Data { get; set; }
}

消息序列化

MessagePack类库中有一个最基础的类MessagePackSerializer,它提供了两个最基本的函数:SerializeDeserialize分别负责将对象序列化为二进制以及将二进制反序列化为对象。我们使用构建者模式来构建我们最终传输的消息对象。

public class MessageBuilder<T>
{
    private int _code;
    private string _msg;
    private T _data;
    public static MessageBuilder<T> Create<T>()
    {
        return new MessageBuilder<T>();
    }

    public MessageBuilder<T> WithCode(int code)
    {
        _code = code;
        return this;
    }

    public MessageBuilder<T> WithMsg(string msg)
    {
        _msg = msg;
        return this;
    }

    public MessageBuilder<T> WithData(T data)
    {
        _data = data;
        return this;
    }

    public byte[] Build()
    {
        var message= new Message<T>
        {
            Code = _code,
            Msg = _msg,
            Data = _data
        };
        byte[] bytes = MessagePackSerializer.Serialize(message);
        return bytes;
    }
}

消息的反序列化

直接使用最基本的Deserialize函数对消息进行反序列化。

public class MessageParser
{
    public static Message<T> Parse<T>(byte[] bytes)
    {
        var message = MessagePackSerializer.Deserialize<Message<T>>(bytes);
        return message;
    }
}

修改MQTT发布者程序

有了上面的消息序列化和反序列化函数之后,我们就可以直接调用。替换MQTT 发布者程序中的消息序列化过程,这里只挑选核心的部分代码展示:

string data;
while (true)
{
    Console.Write("请输入需要发送的消息:");
    data = Console.ReadLine();
    data = "time:" + DateTime.Now.ToString("yyyy-MM-dd HH:MM:ss") + " msg:" + data;
    byte[] bytes = MessageBuilder<string>.Create()
        .WithCode(200)
        .WithMsg("msg")
        .WithData(data)
        .Build();

    var applicationMessage = new MqttApplicationMessageBuilder()
        .WithTopic("mytopic")
        .WithPayload(bytes)
        .Build();
    var result = await mqttClient.PublishAsync(applicationMessage, CancellationToken.None);
}

修改MQTT订阅者程序

对于订阅者程序,需要修改接收到消息的回调函数。

var mqttClientOptions = new MqttClientOptionsBuilder().WithTcpServer("127.0.0.1").Build();
mqttClient.ApplicationMessageReceivedAsync += e =>
{
    Message<string> message = MessageParser.Parse<string>(e.ApplicationMessage.Payload.First.Span.ToArray());
    Console.WriteLine($"接收到消息:clientid:{e.ClientId},topic:{e.ApplicationMessage.Topic},message:{message.Data}");
    return Task.CompletedTask;
};

运行结果

接下来,我们依次启动MQTT服务器进程、订阅者进程和发布者进程,然后看运行结果。

首先是服务器进程运行结果:

其次是订阅者:

最后是发布者,我们用发布者发布三条消息,并且订阅者成功收到了消息。