Android 基于Mqtt开发消息推送服务

发布时间:2019-04-26
技术:Sprint Boot + Android + Mqtt +Http

概述

现在很多App都有消息推送服务,这两天研究了一下mqtt协议,做了个Android Demo(可保证每条推送都受到)

详细

一、效果图

image.png


image.pngimage.png


二、项目目录结构图

image.png

image.png

三、Mqtt介绍

        摘自---->http://www.runoob.com/w3cnote/mqtt-intro.html

    MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。

MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。

四、Apache Apollo 安装

    在这里不做介绍,详细请看https://www.cnblogs.com/xiaojitui/p/7874654.html

五、实现思路

    1:使用Spring Boot接收前端传来得字符----->调用Mqtt向某个主题push消息----->android连接Mqtt服务器------>订阅相关主题--------->在回调mqtt回调中做相关业务

六、使用Spring Boot 接收前端传来得数据

    1:在这里主要用到@RequestParam注解取到前端传过来得数据,之后调用Mqtt把这个数据推送到android上

    

@Controller
public class BaseRequestController {
	@Autowired
	private MqttPushClient mMqtt;
	
	@RequestMapping("/")
	private String  index() {
		return "index";
	}
	@RequestMapping("api/android/push")
	@ResponseBody
	private ResponseBean<String> push(@RequestParam("pushStr")String pushStr){
		mMqtt.publish(ConstString.ANDROID_PUSH_TOPIC, pushStr);
		System.out.println(pushStr);
		return ResponseUtils.success("",pushStr);
	}
	
}
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8" />
<title></title>
<link rel="stylesheet" type="text/css" href="css/style.css" />
<link rel="stylesheet" type="text/css" href="css/bootstrap.min.css" />
<script src="js/jquery.min.js" type="text/javascript" charset="utf-8"></script>
<script src="lib/layui/layui.all.js" type="text/javascript" charset="utf-8"></script>
</head>
<body>
	<div class="main-box">
		<div class="left-box">
			<div class="item-left-box">推送</div>
		</div>
		<div class="content">
			<input id="pushText" type="text" />
			<button id="pushButton">推送()</button>
		</div>
		<script type="text/javascript">
			$("#pushButton").click(function() {
				var pushStr = $("#pushText").val();
				var json = {
					"pushStr" : pushStr
				};
				$.post("/api/android/push",
						json,
						function(data, status) {
						if(data.code==0){
							layer.msg('推送成功,请注意查看手机'); 
						}
				});
			});
		</script>
	</div>
</body>
</html>

七、Android接收推送过来得数据

1:首先需要添加依赖

implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.1'
implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'

2:在AndroidManifest.xml中注册服务,经过测试,如果不加这个,可能没办法连接Mqtt服务器

<service android:name="org.eclipse.paho.android.service.MqttService" />

3:编写连接Mqtt服务器代码 Mqtt.java

public class Mqtt implements MqttCallback ,IMqttActionListener {
    private static Mqtt mMqttServer;
    private MqttAndroidClient mClient;
    private  boolean isConninging;
    private Map<String,Integer> mTopic;
    private MessageCallback callback;

    /**
     * 单列
     * @return
     */
    public static Mqtt getInstance() {
        if(mMqttServer==null){
            synchronized (Mqtt.class){
                if (mMqttServer==null){
                    mMqttServer =new Mqtt();
                }
            }
        }
        return mMqttServer;
    }
    public  void tryConnect(MqttConnectParam param,MessageCallback callback){
        this.callback=callback;
        if (!isConnected()){
            L.i("连接MqttServer -------start");
            connectMqttServer(param);
        }else {
            L.i("Mqtt已经连接");
        }
    }

    /**
     * 连接Mqtt服务器
     */
    private  synchronized  void  connectMqttServer(MqttConnectParam param){
        try {
            if (isConninging){
                Log.i("TAG","Mqtt正在连接");
                return;
            }
            mClient = new MqttAndroidClient(param.getmContext(), param.getmServerUrl(), param.getmId());
            mClient.setCallback(this);
            MqttConnectOptions   conOpt = new MqttConnectOptions();
            // 清除缓存
            conOpt.setCleanSession(true);
            // 设置超时时间,单位:秒
            conOpt.setConnectionTimeout(5 );
            // 心跳包发送间隔,单位:秒
            conOpt.setKeepAliveInterval(5);
            // 用户名
            conOpt.setUserName(param.getmUser());
            // 密码
            conOpt.setPassword(param.getmPass().toCharArray());
            isConninging=true;
            mClient.connect(conOpt,null,this);

        } catch (MqttException e) {
            e.printStackTrace();
            L.i("Mqtt连接失败");
        }
    }

    /**
     *
     * @return 返回Mqtt是否连接
     */
    private boolean isConnected(){
        if (mClient==null)return false;//如果实例==null
        if (!mClient.isConnected())return false;
        return true;
    }

    /**
     * 失去连接
     * @param cause
     */
    @Override
    public void connectionLost(Throwable cause) {
        isConninging=false;
        if (callback!=null){callback.onConnectLost();}
        L.i("连接丢失");
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {

       if(callback!=null){callback.onMessage(topic,message);}

    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {

    }

    @Override
    public void onSuccess(IMqttToken asyncActionToken) {
        isConninging=false;
        if (callback!=null){callback.onConnected();}
        try {
            mClient.subscribe(getTops(),getQos());
        } catch (MqttException e) {
            e.printStackTrace();
        }

        L.i("Mqtt连接成功");
    }

    @Override
    public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
       if(callback!=null){callback.onConnectFailure(asyncActionToken,exception);}
        isConninging=false;
    }

    public Map<String, Integer> getmTopic() {
        return mTopic;
    }

    public void setmTopic(Map<String, Integer> mTopic) {
        this.mTopic = mTopic;
    }
    private String[] getTops(){
        List<String> list =new ArrayList<>();
        for (String key :mTopic.keySet()) {
            list.add(key);
        }
        return list.toArray(new String[list.size()]);
    }
    private int[] getQos(){
        List<Integer> list =new ArrayList<>();
        int[] qos =new int[mTopic.size()];
        for (String key :mTopic.keySet()) {
            list.add(mTopic.get(key));
        }
        Integer[] tq=list.toArray(new Integer[list.size()]);
        for (int i = 0; i < tq.length; i++) {
            qos[i]=tq[i];
        }
        return qos;
    }
    interface  MessageCallback{
        void onMessage(String topic, MqttMessage message);
        void onConnectFailure(IMqttToken asyncActionToken, Throwable exception);
        void onConnected();
        void onConnectLost();
    }

}

4:连接参数MqttConnectParam.java

public class MqttConnectParam {
    private Context mContext;
    private String mServerUrl;
    private String mId;
    private String mUser;
    private String mPass;

    public Context getmContext() {
        return mContext;
    }

    public void setmContext(Context mContext) {
        this.mContext = mContext;
    }

    public String getmServerUrl() {
        return mServerUrl;
    }

    public void setmServerUrl(String mServerUrl) {
        this.mServerUrl = mServerUrl;
    }

    public String getmId() {
        return mId;
    }

    public void setmId(String mId) {
        this.mId = mId;
    }

    public String getmUser() {
        return mUser;
    }

    public void setmUser(String mUser) {
        this.mUser = mUser;
    }

    public String getmPass() {
        return mPass;
    }

    public void setmPass(String mPass) {
        this.mPass = mPass;
    }

    public MqttConnectParam() {

    }

    public MqttConnectParam(Context mContext, String mServerUrl, String mId, String mUser, String mPass) {
        this.mContext = mContext;
        this.mServerUrl = mServerUrl;
        this.mId = mId;
        this.mUser = mUser;
        this.mPass = mPass;
    }
}

八、注册成服务

1:编写Service

public class MqttServer extends Service implements  Mqtt.MessageCallback {
    public  static  final String SERVER_URL="tcp://xxxxxxxxx";//注意这里要修改Mqtt服务器得地址
    public  static  final  String MQTTUSERNAME="user";
    public static  final  String MQTTPASSWORD="password";
    public  static  final  String MQTTID="id";
    private Mqtt mMqtt;
    private IMqttPushCallback mMegCallback;
    private final IBinder mBinder = new MyBind();
    @Override
    public void onCreate() {
        super.onCreate();
        if (mMqtt==null){mMqtt=Mqtt.getInstance();}
    }

    @Override
    public int onStartCommand(Intent intent, int flags, int startId) {
        if (mMqtt==null){mMqtt=Mqtt.getInstance();}
        mMqtt.setmTopic(getTopic());
        mMqtt.tryConnect(getMqttConnectParam(),this);
        return super.onStartCommand(intent, flags, startId);
    }

    @Override
    public IBinder onBind(Intent intent) {
        return mBinder;
    }

    private MqttConnectParam getMqttConnectParam(){
        return  new MqttConnectParam(this,SERVER_URL,MQTTID,MQTTUSERNAME,MQTTPASSWORD);
    }
    private Map<String,Integer> getTopic(){
        Map<String,Integer> resultMap =new HashMap<>();
        resultMap.put(App.ANDROID_PUSH_TOPIC,2);
        return resultMap;
    }

    @Override
    public void onMessage(String topic, MqttMessage message) {
        NotificationState state =NotificationState.getInstance();
        String msg =new String(message.getPayload());
        if (msg.startsWith("\"")){msg =msg.substring(1, msg.length());}
        if (msg.endsWith("\"")){msg =msg.substring(0, msg.length()-1);}
        L.i(msg.startsWith("\""));
        state.notification("通知",msg,null);
        if (mMegCallback!=null){mMegCallback.onPushMessage(msg);}
    }

    @Override
    public void onConnectFailure(IMqttToken asyncActionToken, Throwable exception) {
        L.i("连接失败"+exception.getMessage());
    }

    @Override
    public void onConnected() {
        Toast.makeText(this, "连接成功", Toast.LENGTH_SHORT).show();
    }

    @Override
    public void onConnectLost() {

    }
    public  class  MyBind extends Binder {
      public   void  setCallbacl(IMqttPushCallback callbacl) {
            mMegCallback=callbacl;
        }

    }

}

2:启动服务

Intent intent =new Intent(getApplicationContext(),MqttServer.class);

startService(intent);


本实例支付的费用只是购买源码的费用,如有疑问欢迎在文末留言交流,如需作者在线代码指导、定制等,在作者开启付费服务后,可以点击“购买服务”进行实时联系,请知悉,谢谢
手机上随时阅读、收藏该文章 ?请扫下方二维码