4 通用MQTT协议
通用MQTT为网关默认推送协议,推送主题为data/{gateway_id}/v1
,gateway_id为网关信息里的网关ID,推送格式如下
[
{
"device_id": "Test",
"ts": 1561994655032,
"device_type": "fanuc",
"values": [
{
"name": "cnc_products",
"value": 20559
},
{
"name": "cnc_type",
"value": "FANUC 0i MF"
},
{
"name": "device_state",
"value": 0
},
{
"name": "cnc_mecpos",
"value": [
{
"axis": "X",
"value": -84.4
},
{
"axis": "Y",
"value": 2.986
},
{
"axis": "Z",
"value": -108.181
}
]
},
{
"name": "cnc_alarm",
"value": [
{
"alarm_no": 1,
"alarm_type": "T01",
"alarm_msg": "待机中"
},
{
"alarm_no": 204,
"alarm_type": "XXX",
"alarm_msg": "急停"
}
]
}
]
}
]
4.0.1 订阅Demo
4.0.1.1 Java
Demo
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.fusesource.mqtt.client.*;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
class Alarm {
public String alarm_type;
public String alarm_msg;
public int alarm_no;
}
class Position {
public String axis;
public Double value;
}
class Value {
public String name;
public Object value;
}
class Root {
public String device_id;
public long ts;
public String device_type;
public Value[] values;
}
public class JavaDemo {
final static String TOPIC_NAME = "data/+/v1";
final static String IP = "127.0.0.1";
final static int PORT = 1883;
public static void run() throws Exception {
= new MQTT();
MQTT mqtt .setHost(IP, PORT);
mqtt= mqtt.blockingConnection();
BlockingConnection connection .connect();
connectionSystem.out.println("Connected to Broker!");
[] topics = {new Topic(TOPIC_NAME, QoS.EXACTLY_ONCE)};
Topic.subscribe(topics);
connectionType AlarmType = new TypeToken<ArrayList<Alarm>>() {
}.getType();
Type PositionType = new TypeToken<ArrayList<Position>>() {
}.getType();
while (true) {
= connection.receive(10, TimeUnit.SECONDS);
Message message if (message != null) {
Type RooList = new TypeToken<ArrayList<Root>>() {
}.getType();
ArrayList<Root> table = new Gson().fromJson(new String(message.getPayload()), RooList);
for (Root tt : table) {
System.out.println(tt.device_id);
for (Value value : tt.values) {
System.out.print(value.name + "\t");
if (value.value instanceof java.util.ArrayList) {
if (value.name.equals("cnc_alarm")) {
ArrayList<Alarm> alarms = new Gson().fromJson(value.value.toString(), AlarmType);
for (Alarm alarm : alarms) {
System.out.print("\nalarm_no:" + alarm.alarm_no + "\talarm_type:" + alarm.alarm_type + "\talarm_msg:" + alarm.alarm_msg);
}
} else {
ArrayList<Position> position = new Gson().fromJson(value.value.toString(), PositionType);
for (Position axis : position) {
System.out.print(axis.axis + ":" + axis.value + "\t");
}
}
} else {
System.out.print(value.value);
}
System.out.println();
}
}
.ack();
message}
}
}
public static void main(String[] args) throws Exception {
.run();
JavaDemo}
}
4.0.1.2 .Net core
Demo
using System;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Options;
using System.Text;
using System.Collections.Generic;
namespace dotnet_demo
{
public class Position
{
public string axis { set; get; }
public string value { set; get; }
}
public class Alarm
{
public int alarm_no { set; get; }
public string alarm_type { set; get; }
public string alarm_msg { set; get; }
}
public class Values
{
public string name { get; set; }
public object value { get; set; }
}
public class Root
{
public string device_id { get; set; }
public long ts { get; set; }
public string device_type { get; set; }
public List<Values> values { get; set; }
}
class Program
{
static void Main(string[] args)
{
string topicSubscribe = "data/+/v1";
= new MqttFactory().CreateMqttClient() as MqttClient;
MqttClient mqttClient .UseConnectedHandler(async handle =>
mqttClient{
var result = await mqttClient.SubscribeAsync(new MqttTopicFilter()
{
= topicSubscribe,
Topic = MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce
QualityOfServiceLevel });
});
.UseApplicationMessageReceivedHandler(handle =>
mqttClient{
var payload = Encoding.Default.GetString(handle.ApplicationMessage.Payload);
<Root> table = Newtonsoft.Json.JsonConvert.DeserializeObject<List<Root>>(payload);
Listforeach (var m in table)
{
foreach (var s in m.values)
{
.Write(s.name + "\t");
Consoleif (typeof(Newtonsoft.Json.Linq.JArray) == s.value.GetType())
{
if (s.name == "cnc_alarm")
{
<Alarm> alarms = Newtonsoft.Json.JsonConvert
List.DeserializeObject<List<Alarm>>(s.value.ToString());
foreach (var alarm in alarms)
{
.WriteLine("\nalarm_no:" + alarm.alarm_no.ToString() + "\talarm_type:" + alarm.alarm_type + "\talarm_msg:" + alarm.alarm_msg);
Console}
}
else
{
<Position> pos = Newtonsoft.Json.JsonConvert
List.DeserializeObject<List<Position>>(s.value.ToString());
foreach (var axis in pos)
{
.Write(axis.axis + ":" + axis.value.ToString() + "\t");
Console}
}
}
else
{
.Write(s.value);
Console}
.WriteLine();
Console}
}
});
var options = new MqttClientOptionsBuilder()
.WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311)
.WithClientId(Guid.NewGuid().ToString().Substring(0, 5))
.WithTcpServer("127.0.0.1", 1883)
.WithCleanSession()
.Build();
.ConnectAsync(options);
mqttClient.ReadKey();
Console}
}
}
4.0.1.3 Python
Demo
import json
import paho.mqtt.client as mqtt
def on_connect(mqtt_client, userdata, flags, rc):
'data/+/v1')
mqtt_client.subscribe(
def on_message(mqtt_client, userdata, msg):
try:
= json.loads(msg.payload)
table for tt in table:
print(tt.get('device_id'))
for value in tt.get('values'):
print(value.get('name'), end='\t')
= value.get('value')
data if isinstance(data, list):
if value.get('name') == 'cnc_alarm':
for alarm in data:
print(
f'\nalarm_no:{alarm.get("alarm_no")}\t'
'alarm_type:{alarm.get("alarm_type")}\t'
'alarm_msg:{alarm.get("alarm_msg")}''',
='')
endelse:
for axis in data:
print(f'{axis.get("axis")}:{axis.get("value")}', end='\t')
else:
print(data, end='')
print('')
except Exception as e:
print(e)
= mqtt.Client()
mqtt_client = on_connect
mqtt_client.on_connect = on_message
mqtt_client.on_message connect('127.0.0.1', 1883, 60)
mqtt_client.
if __name__ == '__main__':
mqtt_client.loop_forever()
4.0.1.4 Golang
Demo
package main
import (
"encoding/json"
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
uuid "github.com/satori/go.uuid"
"time"
)
type Alarm struct {
AlarmType string `json:"alarm_type,omitempty"`
AlarmMsg string `json:"alarm_msg,omitempty"`
AlarmNo int64 `json:"alarm_no,omitempty"`
}
type Position struct {
Axis string `json:"axis,omitempty"`
Value float64 `json:"value,omitempty"`
}
type Value struct {
Name string `json:"name,omitempty"`
Value interface{} `json:"value,omitempty"`
}
type Root struct {
DeviceId string `json:"device_id,omitempty"`
DeviceType string `json:"device_type,omitempty"`
Ts int64 `json:"ts,omitempty"`
Values []Value `json:"values,omitempty"`
}
func reciveHandler(client mqtt.Client, m mqtt.Message) {
var messages []Root
_ = json.Unmarshal(m.Payload(), &messages)
for _, message := range messages {
fmt.Println(message.DeviceId)
for _, value := range message.Values {
fmt.Print(value.Name + "\t")
switch value.Value.(type) {
case []interface{}:
if value.Name == "cnc_alarm" {
_alarm, _ := json.Marshal(value.Value)
var alarms []Alarm
_ = json.Unmarshal(_alarm, &alarms)
for _, alarm := range alarms {
fmt.Printf("\nalarm_no:%d\talarm_type:%s\talarm_msg:%s", alarm.AlarmNo, alarm.AlarmType, alarm.AlarmMsg)
}
} else {
_position, _ := json.Marshal(value.Value)
var position []Position
_ = json.Unmarshal(_position, &position)
for _, axis := range position {
fmt.Printf("%s:%f\t", axis.Axis, axis.Value)
}
}
default:
fmt.Print(value.Value)
}
fmt.Println()
}
}
}
func main() {
topic := "data/+/v1"
address := "tcp://wuyun.pro:1883"
u1, _ := uuid.NewV4()
opts := mqtt.NewClientOptions().AddBroker(address).SetClientID(u1.String())
c := mqtt.NewClient(opts)
if token := c.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
if token := c.Subscribe(topic, 0, reciveHandler); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
}
for {
time.Sleep(1 * 1000 * 1000)
}
}