vbdotnetcommon:mqttclient
This is a pure VB.Net MQTT Client
No external library required. Fairly simple, no SSL…
- Test.vb
Option Explicit On Option Strict On Module Module1 Sub Main() Dim M As New MQTT_Client() AddHandler M.Connected, AddressOf hMQQTConnected ' M.Connect_ClientID = "HelloWorld" M.Connect_CleanSession = True M.Connect_KeepAlive = 60 * 2 M.Connect_WillMessage = "Ut oh..." M.Connect_WillTopic = "Crap" M.SetHost("mindy", 1883) M.Connect() 'While M.IsReady = False ' Console.Write(".") ' System.Threading.Thread.Sleep(100) 'End While 'Console.WriteLine() While True If Console.KeyAvailable Then Select Case Console.ReadKey.KeyChar Case "q"c : Exit While Case "s"c : M.SendPUBLISH_QoS0("Hello", System.Text.ASCIIEncoding.ASCII.GetBytes(Now.ToString("yyyyMMdd HH:mm:ss"))) End Select Else End If ' Do stuff?? 'System.Threading.Thread.Sleep(100) 'If System.Threading.ManualResetEvent.WaitAny({M.PUBLISHWaitHandle}, 100) <> System.Threading.WaitHandle.WaitTimeout Then ' While True ' Dim Msg As MQTT_Client.msgPUBLISH = M.GetNextPUBLISHMessage ' If Msg Is Nothing Then Exit While ' Console.WriteLine("PUBLISH: " + Msg.TopicName + ":" + Msg.H_QoSLevel.ToString + CStr(IIf(Msg.H_Retain, "R", "")) + ": " + System.Text.ASCIIEncoding.ASCII.GetString(Msg.Payload)) ' End While 'End If Dim Msg As MQTT_Client.msgPUBLISH = M.WaitOnePUBLISHMessage(100) If Msg IsNot Nothing Then Console.WriteLine("PUBLISH: " + Msg.TopicName + ":" + Msg.H_QoSLevel.ToString + CStr(IIf(Msg.H_Retain, "R", "")) + ": " + System.Text.ASCIIEncoding.ASCII.GetString(Msg.Payload)) End If End While M.SendDISCONNECT() End Sub Private Sub hMQQTConnected(sender As MQTT_Client) 'sender.SendSUBSCRIBE(New MQTT_Client.TopicFilters From {New MQTT_Client.TopicFilters.TopicFilterQoS("#", 0)}) sender.SendSUBSCRIBE(New MQTT_Client.TopicFilters From {New MQTT_Client.TopicFilters.TopicFilterQoS("homie/+/sensor/#", 0)}) End Sub End Module
- MQTT_Client.vb
Option Explicit On Option Strict On Public Class MQTT_Client ' Dim TClient As System.Net.Sockets.TcpClient = Nothing Dim _Socket As System.Net.Sockets.Socket = Nothing '= New System.Net.Sockets.Socket(Net.Sockets.AddressFamily.InterNetwork, Net.Sockets.SocketType.Stream, Net.Sockets.ProtocolType.Tcp) Public Property Connect_ClientID As String = "" Public Property Connect_UserName As String = "" Public Property Connect_Password As String = "" Public Property Connect_WillMessage As String = "" Public Property Connect_WillTopic As String = "" Public Property Connect_WillRetain As Boolean = False Public Property Connect_WillQOS As ConnectFlags = 0 Public Property Connect_CleanSession As Boolean = False Private _Connect_KeepAlive As UInt16 = 60 * 5 Public Property Connect_KeepAlive As UInt16 Get Return _Connect_KeepAlive End Get Set(value As UInt16) _Connect_KeepAlive = value If value > 0 Then topPing = value \ 2 Else topPing = 60 End If If cntPing > topPing Then cntPing = topPing End Set End Property Public Property IsConnected As Boolean = False Public Property IsReady As Boolean = False Public Property StayConnected As Boolean = True Public Event Connected(sender As MQTT_Client) Public Event FailedConnect(sender As MQTT_Client) Public Event Disconnected(sender As MQTT_Client) Public Property Connect_Endpoint As System.Net.EndPoint Private PacketIdentifier As Int32 = &HC0FEUS Private Const ValidClientIDChars As String = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" Private Const RxBuffSize As Integer = 1024 * 8 Private RxBuff(0 To (RxBuffSize - 1)) As Byte Private RxPayload As New System.IO.MemoryStream(1024 * 8) Private RxPayloadLen As Integer = 0 Private Messages As New Queue(Of msgPUBLISH) Private CurrentMessage As Message = Nothing Private mrePUBLISH As New System.Threading.ManualResetEvent(False) Private lckPUBLISH As New Object Private RxPayloadHeader(0 To 4) As Byte Private tmrPing As Timers.Timer = Nothing Private lckPing As New Object Private _topPing As Integer = 60 Private Property topPing As Integer Get SyncLock lckPing Return _topPing End SyncLock End Get Set(value As Integer) SyncLock lckPing _topPing = value End SyncLock End Set End Property Private _cntPing As Integer = 60 Private Property cntPing As Integer Get SyncLock lckPing Return _cntPing End SyncLock End Get Set(value As Integer) SyncLock lckPing _cntPing = value End SyncLock End Set End Property Private Sub ResetCntPing() _cntPing = _topPing End Sub Public Sub New() tmrPing = New Timers.Timer(1000) tmrPing.Enabled = True AddHandler tmrPing.Elapsed, AddressOf hTmrPing End Sub Public Class Message Private Const B_F As Byte = &HF Public ControlPacketTypeByte As Byte Public ReadOnly Property ControlPacketType As ControlPacketType Get Return DirectCast(ControlPacketTypeByte >> 4, ControlPacketType) End Get End Property Public Payload() As Byte = {} Public ReadOnly Property PayloadLen As Integer Get Return Payload.Length End Get End Property Public ReadOnly Property Flags As Byte Get Return ControlPacketTypeByte And B_F End Get End Property End Class Private Function GetPacketIdentifier() As UInt16 PacketIdentifier += 1 If PacketIdentifier > UInt16.MaxValue Then PacketIdentifier = 0 End If Return CUShort(PacketIdentifier And UInt16.MaxValue) End Function Public Sub SetHost(hostname As String, port As Integer) Connect_Endpoint = New System.Net.IPEndPoint(System.Net.Dns.GetHostAddresses(hostname)(0), port) End Sub Private Sub hFailedConnect() Handles Me.FailedConnect If Me.StayConnected Then Me.Connect() End Sub Private Sub hDisconnected() Handles Me.Disconnected If Me._CurrentState = States.WaitCONNACK Then Me.StayConnected = False ' Connect error If Me.StayConnected Then Me.Connect() End Sub Private Sub hTmrPing(sender As Object, e As System.Timers.ElapsedEventArgs) _cntPing -= 1 If _cntPing <= 0 Then Me.SendPINGREQ() End If End Sub Public Sub Connect() IsConnected = False IsReady = False If Me._Socket IsNot Nothing Then _Socket.Close() _Socket = Nothing End If _Socket = New System.Net.Sockets.Socket(Net.Sockets.AddressFamily.InterNetwork, Net.Sockets.SocketType.Stream, Net.Sockets.ProtocolType.Tcp) _Socket.ReceiveBufferSize = 1024 * 8 _Socket.SendBufferSize = 1024 * 8 _Socket.NoDelay = True _Socket.Blocking = True 'False _Socket.ReceiveTimeout = 2000 '_Socket.ReceiveTimeout=5000 '_Socket.SendTimeout = _SendTimeout Try Console.WriteLine(" Atempting connection to: " + Me.Connect_Endpoint.ToString) Me._CurrentState = States.Connecting _Socket.BeginConnect(Me.Connect_Endpoint, AddressOf hConnectCallback, _Socket) '_Socket.Connect(EP) Catch ex As Exception ' nothing to catch... End Try End Sub Private Sub hConnectCallback(ByVal ar As System.IAsyncResult) Try Dim s As System.Net.Sockets.Socket = CType(ar.AsyncState, System.Net.Sockets.Socket) If s.Connected = False Then RaiseEvent FailedConnect(Me) Exit Sub End If Console.WriteLine(" Connected to: {0}, Connected={1}", s.RemoteEndPoint.ToString, s.Connected) s.EndConnect(ar) Me._CurrentState = States.SendCONNECT SendCONNECT(s) IsConnected = True s.BeginReceive(RxBuff, 0, RxBuffSize, Net.Sockets.SocketFlags.None, AddressOf hReceiveCallback, s) ' RaiseEvent Connected(Me) Catch ex As Exception Console.Write(ex.Message + vbCrLf) RaiseEvent FailedConnect(Me) End Try End Sub Private Sub DoClose() IsConnected = False IsReady = False Console.WriteLine(" Closed") If Me._Socket IsNot Nothing Then _Socket.Close() _Socket = Nothing End If RaiseEvent Disconnected(Me) End Sub Private Sub hReceiveCallback(ByVal ar As System.IAsyncResult) On Error GoTo Er Dim s As System.Net.Sockets.Socket = CType(ar.AsyncState, System.Net.Sockets.Socket) Dim BytesReceived As Integer = 0 If s.Connected Then BytesReceived = s.EndReceive(ar) If BytesReceived > 0 Then Dim DoDisconnect As Boolean = False ProcessRxBuffer(BytesReceived, 0, DoDisconnect) If DoDisconnect Then Console.WriteLine("ERR: DoDisconnect: " + Err.Description) DoClose() Else s.BeginReceive(RxBuff, 0, RxBuffSize, Net.Sockets.SocketFlags.None, AddressOf hReceiveCallback, s) End If Else ' Close it Console.WriteLine("ERR: BytesReceived=0: " + Err.Description) DoClose() End If Exit Sub Er: Console.WriteLine("ERR: hReceiveCallback: " + Err.Description) DoClose() End Sub Private Sub ProcessRxBuffer(BytesReceived As Integer, ByteOffset As Integer, ByRef DoDisconnect As Boolean) ' Console.WriteLine(String.Format("ProcessRxBuffer: RX:{0}, BO:{1}", BytesReceived, ByteOffset)) Select Case _CurrentState Case States.WaitControlPacket, States.WaitCONNACK ' Need to wait for enough bytes to be able to read type and length Dim BytesAvail As Integer = BytesReceived - ByteOffset If BytesAvail = 0 Then 'DoDisconnect = True Exit Sub End If If FlagsLookOk(RxBuff(ByteOffset)) = False Then DoDisconnect = True Exit Sub End If CurrentMessage = New Message With {.ControlPacketTypeByte = RxBuff(ByteOffset)} ' See if we get a length Dim OutSize As Integer = 0 Dim BytesConsumed = BytesToLength(RxBuff, ByteOffset + 1, BytesReceived, OutSize) Select Case BytesConsumed Case Is <= 0, Is > 4 DoDisconnect = True Exit Sub End Select If OutSize > 1024 * 1024 Then ' A little too big, well, for now... 256MB is max-ish DoDisconnect = True Exit Sub End If ' Ok, if here, we should have a valid length RxPayloadLen = OutSize ' Go to next state _CurrentState = States.ReceivingControlPacket ' Call again ProcessRxBuffer(BytesReceived, ByteOffset + 1 + BytesConsumed, DoDisconnect) If DoDisconnect Then Exit Sub Case States.ReceivingControlPacket ' Receive payload ' Get the number of bytes left to read Dim BytesToRead As Int32 = CInt(RxPayloadLen - RxPayload.Position) ' See how many we have in RX buff Dim BytesAvail As Int32 = BytesReceived - ByteOffset ' Read what we can If BytesToRead > BytesAvail Then BytesToRead = BytesAvail RxPayload.Write(RxBuff, ByteOffset, BytesToRead) ByteOffset = ByteOffset + BytesToRead ' See where we are If RxPayloadLen = RxPayload.Position Then ' We have a full buffer CurrentMessage.Payload = RxPayload.ToArray HandleMessage(CurrentMessage, DoDisconnect) ' Console.WriteLine(String.Format(" Received: {0}", CurrentMessage.ControlPacketType)) CurrentMessage = Nothing RxPayload.Position = 0 RxPayload.SetLength(0) _CurrentState = States.WaitControlPacket End If ' Run more? If ByteOffset < BytesReceived Then ProcessRxBuffer(BytesReceived, ByteOffset, DoDisconnect) End If Case Else DoDisconnect = True End Select 'Console.WriteLine(String.Format("_rocessRxBuffer: RX:{0}, BO:{1}", BytesReceived, ByteOffset)) End Sub Private Sub HandleMessage(Msg As Message, ByRef DoDisconnect As Boolean) Select Case Msg.ControlPacketType Case ControlPacketType.CONNACK If Msg.PayloadLen <> msgCONNACK.FIXEDLENGTH Then DoDisconnect = True Exit Sub End If Dim M As New msgCONNACK(Msg.Payload) Console.WriteLine("CONNACK: " + M.ReturnCode.ToString) If M.ReturnCode <> msgCONNACK.ConnectReturnCode.ConnectionAccepted Then DoDisconnect = True Me.StayConnected = False ' It didn't like us, so don't hammer Exit Sub End If IsReady = True RaiseEvent Connected(Me) Case ControlPacketType.PUBLISH NotifyPUBLISH(New msgPUBLISH(Msg)) Case ControlPacketType.PUBACK Case ControlPacketType.PINGREQ ' Not likely, as the Client is supposed to originate, not the server... Dim M As New msgPINGREQ(Msg) SendPINGRESP() Case ControlPacketType.PINGRESP Dim M As New msgPINGRESP(Msg) Case ControlPacketType.SUBACK Case ControlPacketType.UNSUBACK End Select End Sub Private Sub NotifyPUBLISH(M As msgPUBLISH) SyncLock lckPUBLISH Messages.Enqueue(M) End SyncLock mrePUBLISH.Set() End Sub ''' <summary>Returns a message if present, or Nothing if none</summary> Public Function GetNextPUBLISHMessage() As msgPUBLISH Dim RetVal As msgPUBLISH = Nothing SyncLock lckPUBLISH If Messages.Count > 0 Then RetVal = Messages.Dequeue If Messages.Count = 0 Then mrePUBLISH.Reset() End SyncLock Return RetVal End Function Public ReadOnly Property PUBLISHWaitHandle As System.Threading.EventWaitHandle Get Return Me.mrePUBLISH End Get End Property ''' <summary>Returns a message if present, or Nothing if times out</summary> Public Function WaitOnePUBLISHMessage(millisecondsTimeout As Integer) As msgPUBLISH If Me.mrePUBLISH.WaitOne(millisecondsTimeout, False) Then Return Me.GetNextPUBLISHMessage End If Return Nothing End Function Private _CurrentState As States = States.NotConnected Public Enum States As Integer NotConnected = 0 Connecting SendCONNECT WaitCONNACK 'ReceivingCONNACK ' Normal states WaitControlPacket ' Once we send the connect packet, we go here. We always come back here ReceivingControlPacket ' We are here while more data is coming in End Enum Public Enum ControlPacketType As Byte Reserved_0 = 0 ' Forbidden Reserved CONNECT = 1 ' Client to Server Client request to connect to Server CONNACK = 2 ' Server to Client Connect acknowledgment PUBLISH = 3 ' Client to Server or Server to Client Publish message PUBACK = 4 ' Client to Server or Server to Client Publish acknowledgment PUBREC = 5 ' Client to Server or Server to Client Publish received (assured delivery part 1) PUBREL = 6 ' Client to Server or Server to Client Publish release (assured delivery part 2) PUBCOMP = 7 ' Client to Server or Server to Client Publish complete (assured delivery part 3) SUBSCRIBE = 8 ' Client to Server Client subscribe request SUBACK = 9 ' Server to Client Subscribe acknowledgment UNSUBSCRIBE = 10 ' Client to Server Unsubscribe request UNSUBACK = 11 ' Server to Client Unsubscribe acknowledgment PINGREQ = 12 ' Client to Server PING request PINGRESP = 13 ' Server to Client PING response DISCONNECT = 14 ' Client to Server Client is disconnecting Reserved_15 = 15 ' Forbidden Reserved End Enum Public Class TopicFilters : Inherits System.Collections.Generic.List(Of TopicFilterQoS) Public Class TopicFilterQoS Public Property TopicFilter As String Public Property QoS As Byte Public Sub New() End Sub Public Sub New(TopicFilter As String, QoS As Byte) Me.TopicFilter = TopicFilter Me.QoS = QoS End Sub Public Sub Serialize(Stm As System.IO.Stream) UTF8.WriteStringToStream(Stm, Me.TopicFilter) Stm.WriteByte(Me.QoS) End Sub End Class Public Sub Serialize(Stm As System.IO.Stream) For Each T In Me T.Serialize(Stm) Next End Sub End Class Public Class msgCONNACK Public Const FIXEDLENGTH As Int32 = 2 Public Property AcknowledgeFlags As Byte Public Property ReturnCode As ConnectReturnCode Public Enum ConnectReturnCode As Byte ConnectionAccepted = 0 ConnectionRefused_UnacceptableProtocolVersion = 1 ConnectionRefused_IdentifierRejected = 2 ConnectionRefused_ServerUnavailable = 3 ConnectionRefused_BadUsernameOrPassword = 4 ConnectionRefused_NotAuthorized = 5 End Enum Public Sub New() End Sub Public Sub New(Payload() As Byte) Me.AcknowledgeFlags = Payload(0) Me.ReturnCode = DirectCast(Payload(1), ConnectReturnCode) End Sub End Class Public Class msgPUBLISH Public Property H_DUPFlag As Boolean Public Property H_QoSLevel As Byte Public Property H_Retain As Boolean Public TopicName As String Public PacketIdentifier As UInt16 Public Payload() As Byte = {} Public Function PayloadToString() As String Return System.Text.ASCIIEncoding.ASCII.GetString(Me.Payload) End Function Public Sub New() End Sub Public Sub New(Msg As Message) H_DUPFlag = (Msg.Flags And &H1 << 3) <> 0 H_QoSLevel = CByte((Msg.Flags And &H3 << 1) >> 1) H_Retain = (Msg.Flags And &H1 << 0) <> 0 Dim Offset As Integer = 0 ' Get Topic Name Dim TN As New UTF8(Msg.Payload, Offset) Offset += TN.TotalLength Me.TopicName = TN.ToString ' Packet Identifier? If Me.H_QoSLevel > 0 Then Me.PacketIdentifier = ReadUInt16_BE(Msg.Payload, Offset) Offset += 2 End If ' The rest of the payload Dim FinalPayloadLen As Integer = Msg.PayloadLen - Offset Array.Resize(Me.Payload, FinalPayloadLen) Array.Copy(Msg.Payload, Offset, Me.Payload, 0, FinalPayloadLen) ' Done! End Sub End Class Public Class msgPINGREQ Public Sub New() End Sub Public Sub New(Msg As Message) ' No header or payload payload.. ' Done! End Sub End Class Public Class msgPINGRESP Public Sub New() End Sub Public Sub New(Msg As Message) ' No header or payload payload.. ' Done! End Sub End Class <Flags()> Public Enum ConnectFlags As Byte UserNameFlag = 1 << 7 PasswordFlag = 1 << 6 WillRetain = 1 << 5 WillQoS_1 = 1 << 3 WillQoS_2 = 2 << 3 WillFlag = 1 << 2 CleanSession = 1 << 1 Reserved = 1 << 0 End Enum Public Class UTF8 Private _BA() As Byte = {} Public ReadOnly Property TotalLength As Integer Get Return 2 + _BA.Length End Get End Property Public Sub New(S As String) _BA = System.Text.UTF8Encoding.UTF8.GetBytes(S) If _BA.Length > UInt16.MaxValue Then Throw New Exception("Too many bytes") End Sub Public Sub New(B() As Byte) ' Assumes first two bytes are length Dim BLen As Integer = B.Length Dim BLen_m2 As Integer = BLen - 2 If BLen_m2 < 0 Then Throw New Exception("Too few bytes") If BLen_m2 > UInt16.MaxValue Then Throw New Exception("Too many bytes") Array.Resize(_BA, BLen_m2) Array.Copy(B, 2, _BA, 0, BLen_m2) End Sub Public Sub New(B() As Byte, Offset As Integer) ' Assumes first two bytes are length Dim BLen As Integer = ReadUInt16_BE(B, Offset) Array.Resize(_BA, BLen) Array.Copy(B, Offset + 2, _BA, 0, BLen) End Sub Public Sub New(Stm As System.IO.Stream) ' Read length Dim BLen As Integer = ReadUInt16_BE(Stm) Array.Resize(_BA, BLen) Stm.Read(_BA, 0, BLen) End Sub Public Overrides Function ToString() As String Return System.Text.UTF8Encoding.UTF8.GetString(_BA) End Function Public Function GetByteArray() As Byte() Dim BA2() As Byte = {} Dim BLen As Integer = _BA.Length ' Copy data over Array.Resize(BA2, BLen + 2) Array.Copy(_BA, 0, BA2, 2, BLen) WriteUInt16_BE(BA2, 0, CUShort(BLen)) Return BA2 End Function Public Sub WriteToStream(Stm As System.IO.Stream) Dim BLen As Integer = _BA.Length WriteUInt16_BE(Stm, CUShort(BLen)) Stm.Write(_BA, 0, BLen) End Sub Public Shared Sub WriteStringToStream(Stm As System.IO.Stream, S As String) Dim U As New UTF8(S) U.WriteToStream(Stm) End Sub End Class Private Shared Function FlagsLookOk(B As Byte) As Boolean Const B_F As Byte = &HF ' Checks the flags field to see if they are ok, per spec Dim CPT As ControlPacketType = DirectCast(B >> 4, ControlPacketType) Dim Flags As Byte = B And B_F Select Case CPT Case ControlPacketType.CONNECT : Return Flags = 0 ' Reserved 0000 Case ControlPacketType.CONNACK : Return Flags = 0 ' Reserved 0000 Case ControlPacketType.PUBLISH : Return True ' Used in MQTT 3.1.1 DUP1 QoS2 QoS2 RETAIN3 Case ControlPacketType.PUBACK : Return Flags = 0 ' Reserved 0000 Case ControlPacketType.PUBREC : Return Flags = 0 ' Reserved 0000 Case ControlPacketType.PUBREL : Return Flags = 2 ' Reserved 0010 Case ControlPacketType.PUBCOMP : Return Flags = 0 ' Reserved 0000 Case ControlPacketType.SUBSCRIBE : Return Flags = 2 ' Reserved 0010 Case ControlPacketType.SUBACK : Return Flags = 0 ' Reserved 0000 Case ControlPacketType.UNSUBSCRIBE : Return Flags = 2 ' Reserved 0010 Case ControlPacketType.UNSUBACK : Return Flags = 0 ' Reserved 0000 Case ControlPacketType.PINGREQ : Return Flags = 0 ' Reserved 0000 Case ControlPacketType.PINGRESP : Return Flags = 0 ' Reserved 0000 Case ControlPacketType.DISCONNECT : Return Flags = 0 ' Reserved 0000 End Select Return False ' I don't think we can get here... End Function Private Shared Sub WriteUInt16_BE(Arr() As Byte, Offset As Int32, Value As UInt16) Arr(Offset + 1) = CByte(Value And 255US) Value = Value >> 8 Arr(Offset) = CByte(Value And 255US) End Sub Private Shared Sub WriteUInt16_BE(Stm As System.IO.Stream, Value As UInt16) Dim BA() As Byte = {0, 0} WriteUInt16_BE(BA, 0, Value) Stm.Write(BA, 0, 2) End Sub Public Shared Function ReadUInt16_BE(Arr() As Byte, Offset As Int32) As UInt16 Dim V As UInt16 = 0 V = CByte(Arr(Offset + 1)) V = V Or (CUShort(Arr(Offset + 0)) << 8) Return V End Function Public Shared Function ReadUInt16_BE(Stm As System.IO.Stream) As UInt16 Dim BA(0 To 1) As Byte Stm.Read(BA, 0, 2) Return ReadUInt16_BE(BA, 0) End Function Private Sub SendControlPacket(CPT As ControlPacketType, Flags As Byte, Payload() As Byte) Const B15 As Byte = 15 If _Socket Is Nothing Then Exit Sub If _Socket.Connected = False Then Exit Sub Dim PayloadLen As Integer = Payload.Length Dim LenBA() As Byte = LengthToBytes(PayloadLen) Dim OutBA() As Byte = {} Array.Resize(OutBA, 1 + LenBA.Length + PayloadLen) OutBA(0) = (CPT << 4) Or (Flags And B15) Array.Copy(LenBA, 0, OutBA, 1, LenBA.Length) Array.Copy(Payload, 0, OutBA, 1 + LenBA.Length, PayloadLen) ' Send ' Console.WriteLine(" send") _Socket.Send(OutBA) ResetCntPing() End Sub Private Sub SendCONNECT(_S As System.Net.Sockets.Socket) If _S.Connected = False Then Exit Sub ' Variable Header Dim MS_V As New System.IO.MemoryStream(256) ' Protocol Name UTF8.WriteStringToStream(MS_V, "MQTT") '1-6 MS_V.WriteByte(4) 'Protocol Level byte - 7 ' Build Connect Flags Dim CF As ConnectFlags = Connect_WillQOS If Connect_CleanSession Then CF = CF Or ConnectFlags.CleanSession If Connect_Password.Length > 0 Then CF = CF Or ConnectFlags.PasswordFlag If Connect_UserName.Length > 0 Then CF = CF Or ConnectFlags.UserNameFlag If Connect_WillRetain Then CF = CF Or ConnectFlags.WillRetain If Connect_WillMessage.Length > 0 Then CF = CF Or ConnectFlags.WillFlag MS_V.WriteByte(CF) '8 WriteUInt16_BE(MS_V, Connect_KeepAlive) ' 9-10 ' Payload 'Client Identifier, Will Topic, Will Message, User Name, Password UTF8.WriteStringToStream(MS_V, Connect_ClientID) If (CF And ConnectFlags.WillFlag) <> 0 Then UTF8.WriteStringToStream(MS_V, Connect_WillTopic) UTF8.WriteStringToStream(MS_V, Connect_WillMessage) End If If (CF And ConnectFlags.UserNameFlag) <> 0 Then UTF8.WriteStringToStream(MS_V, Connect_UserName) End If If (CF And ConnectFlags.PasswordFlag) <> 0 Then UTF8.WriteStringToStream(MS_V, Connect_Password) ' Should be binary... End If Me._CurrentState = States.WaitCONNACK ' Set before to avoid race condition SendControlPacket(ControlPacketType.CONNECT, 0, MS_V.ToArray) End Sub Public Sub SendPUBLISH_QoS0(Topic As String, Payload() As Byte, Optional Retain As Boolean = False) ' Variable Header Const B_1 As Byte = 1 Dim MS_V As New System.IO.MemoryStream(256 + Payload.Length) Dim Flags As Byte = 0 If Retain Then Flags = Flags Or B_1 ' Topic UTF8.WriteStringToStream(MS_V, Topic) ' Packet ID ' Payload MS_V.Write(Payload, 0, Payload.Length) ' Send! SendControlPacket(ControlPacketType.PUBLISH, Flags, MS_V.ToArray) End Sub Public Sub SendSUBSCRIBE(TopicFilters As TopicFilters) ' Variable Header Dim MS_V As New System.IO.MemoryStream(256) ' Packet Identifier (should save and ref it...) WriteUInt16_BE(MS_V, GetPacketIdentifier) ' Topics / QoSs TopicFilters.Serialize(MS_V) ' Send! SendControlPacket(ControlPacketType.SUBSCRIBE, 2, MS_V.ToArray) End Sub Public Sub SendPINGREQ() ' Send! SendControlPacket(ControlPacketType.PINGREQ, 0, {}) End Sub Public Sub SendPINGRESP() ' Send! SendControlPacket(ControlPacketType.PINGRESP, 0, {}) End Sub Public Sub SendDISCONNECT() ' Send! Me.StayConnected = False SendControlPacket(ControlPacketType.DISCONNECT, 0, {}) Console.WriteLine("SendDISCONNECT: " + Err.Description) DoClose() End Sub Private Shared Function LengthToBytes(V As Int32) As Byte() Dim Q As New System.Collections.Generic.Queue(Of Byte)(8) Dim B As Byte Const B128 As Byte = 128 While True B = CByte(V And &H7F) V = V >> 7 If V = 0 Then Q.Enqueue(B) Exit While Else Q.Enqueue(B Or B128) End If End While Return Q.ToArray End Function Private Shared Function BytesToLength(BA() As Byte, ByVal Offset As Integer, BufferLen As Integer, ByRef OutSize As Integer) As Integer ' Returns 1-4 on ok, <=0 on error (Too little data, etc). Returns number of consumed bytes Dim ConsumedBytes As Byte = 0 Dim BALen As Integer = BufferLen 'BA.Length Dim B As Byte Const B128 As Byte = 128 Const B1 As Byte = 1 ' Read bytes, until MSB is clear OutSize = 0 ' LSB is sent first.. Dim ShiftCnt As Integer = 0 While True If Offset >= BALen Then Return -1 B = BA(Offset) ConsumedBytes += B1 Offset += 1 OutSize = OutSize Or (CInt(B And 127) << (7 * ShiftCnt)) If (B And B128) = 0 Then Exit While ShiftCnt += 1 If ShiftCnt > 4 Then Return -2 End If End While Return ConsumedBytes End Function End Class
vbdotnetcommon/mqttclient.txt · Last modified: 2025/07/09 11:34 by srbios