Android 基于Mqtt开发消息推送服务
概述
详细
一、效果图
二、项目目录结构图
三、Mqtt介绍
摘自---->http://www.runoob.com/w3cnote/mqtt-intro.html
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。
MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。
四、Apache Apollo 安装
在这里不做介绍,详细请看https://www.cnblogs.com/xiaojitui/p/7874654.html
五、实现思路
1:使用Spring Boot接收前端传来得字符----->调用Mqtt向某个主题push消息----->android连接Mqtt服务器------>订阅相关主题--------->在回调mqtt回调中做相关业务
六、使用Spring Boot 接收前端传来得数据
1:在这里主要用到@RequestParam注解取到前端传过来得数据,之后调用Mqtt把这个数据推送到android上
@Controller public class BaseRequestController { @Autowired private MqttPushClient mMqtt; @RequestMapping("/") private String index() { return "index"; } @RequestMapping("api/android/push") @ResponseBody private ResponseBean<String> push(@RequestParam("pushStr")String pushStr){ mMqtt.publish(ConstString.ANDROID_PUSH_TOPIC, pushStr); System.out.println(pushStr); return ResponseUtils.success("",pushStr); } }
<!DOCTYPE html> <html> <head> <meta charset="utf-8" /> <title></title> <link rel="stylesheet" type="text/css" href="css/style.css" /> <link rel="stylesheet" type="text/css" href="css/bootstrap.min.css" /> <script src="js/jquery.min.js" type="text/javascript" charset="utf-8"></script> <script src="lib/layui/layui.all.js" type="text/javascript" charset="utf-8"></script> </head> <body> <div class="main-box"> <div class="left-box"> <div class="item-left-box">推送</div> </div> <div class="content"> <input id="pushText" type="text" /> <button id="pushButton">推送()</button> </div> <script type="text/javascript"> $("#pushButton").click(function() { var pushStr = $("#pushText").val(); var json = { "pushStr" : pushStr }; $.post("/api/android/push", json, function(data, status) { if(data.code==0){ layer.msg('推送成功,请注意查看手机'); } }); }); </script> </div> </body> </html>
七、Android接收推送过来得数据
1:首先需要添加依赖
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.1' implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
2:在AndroidManifest.xml中注册服务,经过测试,如果不加这个,可能没办法连接Mqtt服务器
<service android:name="org.eclipse.paho.android.service.MqttService" />
3:编写连接Mqtt服务器代码 Mqtt.java
public class Mqtt implements MqttCallback ,IMqttActionListener { private static Mqtt mMqttServer; private MqttAndroidClient mClient; private boolean isConninging; private Map<String,Integer> mTopic; private MessageCallback callback; /** * 单列 * @return */ public static Mqtt getInstance() { if(mMqttServer==null){ synchronized (Mqtt.class){ if (mMqttServer==null){ mMqttServer =new Mqtt(); } } } return mMqttServer; } public void tryConnect(MqttConnectParam param,MessageCallback callback){ this.callback=callback; if (!isConnected()){ L.i("连接MqttServer -------start"); connectMqttServer(param); }else { L.i("Mqtt已经连接"); } } /** * 连接Mqtt服务器 */ private synchronized void connectMqttServer(MqttConnectParam param){ try { if (isConninging){ Log.i("TAG","Mqtt正在连接"); return; } mClient = new MqttAndroidClient(param.getmContext(), param.getmServerUrl(), param.getmId()); mClient.setCallback(this); MqttConnectOptions conOpt = new MqttConnectOptions(); // 清除缓存 conOpt.setCleanSession(true); // 设置超时时间,单位:秒 conOpt.setConnectionTimeout(5 ); // 心跳包发送间隔,单位:秒 conOpt.setKeepAliveInterval(5); // 用户名 conOpt.setUserName(param.getmUser()); // 密码 conOpt.setPassword(param.getmPass().toCharArray()); isConninging=true; mClient.connect(conOpt,null,this); } catch (MqttException e) { e.printStackTrace(); L.i("Mqtt连接失败"); } } /** * * @return 返回Mqtt是否连接 */ private boolean isConnected(){ if (mClient==null)return false;//如果实例==null if (!mClient.isConnected())return false; return true; } /** * 失去连接 * @param cause */ @Override public void connectionLost(Throwable cause) { isConninging=false; if (callback!=null){callback.onConnectLost();} L.i("连接丢失"); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { if(callback!=null){callback.onMessage(topic,message);} } @Override public void deliveryComplete(IMqttDeliveryToken token) { } @Override public void onSuccess(IMqttToken asyncActionToken) { isConninging=false; if (callback!=null){callback.onConnected();} try { mClient.subscribe(getTops(),getQos()); } catch (MqttException e) { e.printStackTrace(); } L.i("Mqtt连接成功"); } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { if(callback!=null){callback.onConnectFailure(asyncActionToken,exception);} isConninging=false; } public Map<String, Integer> getmTopic() { return mTopic; } public void setmTopic(Map<String, Integer> mTopic) { this.mTopic = mTopic; } private String[] getTops(){ List<String> list =new ArrayList<>(); for (String key :mTopic.keySet()) { list.add(key); } return list.toArray(new String[list.size()]); } private int[] getQos(){ List<Integer> list =new ArrayList<>(); int[] qos =new int[mTopic.size()]; for (String key :mTopic.keySet()) { list.add(mTopic.get(key)); } Integer[] tq=list.toArray(new Integer[list.size()]); for (int i = 0; i < tq.length; i++) { qos[i]=tq[i]; } return qos; } interface MessageCallback{ void onMessage(String topic, MqttMessage message); void onConnectFailure(IMqttToken asyncActionToken, Throwable exception); void onConnected(); void onConnectLost(); } }
4:连接参数MqttConnectParam.java
public class MqttConnectParam { private Context mContext; private String mServerUrl; private String mId; private String mUser; private String mPass; public Context getmContext() { return mContext; } public void setmContext(Context mContext) { this.mContext = mContext; } public String getmServerUrl() { return mServerUrl; } public void setmServerUrl(String mServerUrl) { this.mServerUrl = mServerUrl; } public String getmId() { return mId; } public void setmId(String mId) { this.mId = mId; } public String getmUser() { return mUser; } public void setmUser(String mUser) { this.mUser = mUser; } public String getmPass() { return mPass; } public void setmPass(String mPass) { this.mPass = mPass; } public MqttConnectParam() { } public MqttConnectParam(Context mContext, String mServerUrl, String mId, String mUser, String mPass) { this.mContext = mContext; this.mServerUrl = mServerUrl; this.mId = mId; this.mUser = mUser; this.mPass = mPass; } }
八、注册成服务
1:编写Service
public class MqttServer extends Service implements Mqtt.MessageCallback { public static final String SERVER_URL="tcp://xxxxxxxxx";//注意这里要修改Mqtt服务器得地址 public static final String MQTTUSERNAME="user"; public static final String MQTTPASSWORD="password"; public static final String MQTTID="id"; private Mqtt mMqtt; private IMqttPushCallback mMegCallback; private final IBinder mBinder = new MyBind(); @Override public void onCreate() { super.onCreate(); if (mMqtt==null){mMqtt=Mqtt.getInstance();} } @Override public int onStartCommand(Intent intent, int flags, int startId) { if (mMqtt==null){mMqtt=Mqtt.getInstance();} mMqtt.setmTopic(getTopic()); mMqtt.tryConnect(getMqttConnectParam(),this); return super.onStartCommand(intent, flags, startId); } @Override public IBinder onBind(Intent intent) { return mBinder; } private MqttConnectParam getMqttConnectParam(){ return new MqttConnectParam(this,SERVER_URL,MQTTID,MQTTUSERNAME,MQTTPASSWORD); } private Map<String,Integer> getTopic(){ Map<String,Integer> resultMap =new HashMap<>(); resultMap.put(App.ANDROID_PUSH_TOPIC,2); return resultMap; } @Override public void onMessage(String topic, MqttMessage message) { NotificationState state =NotificationState.getInstance(); String msg =new String(message.getPayload()); if (msg.startsWith("\"")){msg =msg.substring(1, msg.length());} if (msg.endsWith("\"")){msg =msg.substring(0, msg.length()-1);} L.i(msg.startsWith("\"")); state.notification("通知",msg,null); if (mMegCallback!=null){mMegCallback.onPushMessage(msg);} } @Override public void onConnectFailure(IMqttToken asyncActionToken, Throwable exception) { L.i("连接失败"+exception.getMessage()); } @Override public void onConnected() { Toast.makeText(this, "连接成功", Toast.LENGTH_SHORT).show(); } @Override public void onConnectLost() { } public class MyBind extends Binder { public void setCallbacl(IMqttPushCallback callbacl) { mMegCallback=callbacl; } } }
2:启动服务
Intent intent =new Intent(getApplicationContext(),MqttServer.class); startService(intent);