IoT|Android使用的MQTT客户端

Android使用的MQTT客户端,支持订阅、发送消息;
支持创建连接到本地保存;
支持话题消息筛选;

使用视频:https://dwz.cn/undJFEnq
小米应用商店也有 【蘑菇IoT】~
IoT|Android使用的MQTT客户端
文章图片

核心代码贴一下,做个记录

import android.app.Service; import android.content.Context; import android.content.Intent; import android.os.IBinder; import androidx.annotation.Nullable; import com.annimon.stream.Collectors; import com.annimon.stream.Stream; import com.freddon.android.snackkit.extension.regex.RegexHelper; import com.freddon.android.snackkit.extension.tools.NetSuit; import com.freddon.android.snackkit.log.Loger; import com.qiniu.util.StringUtils; import com.sagocloud.ntworker.agent.App; import com.sagocloud.ntworker.agent.RxEventBus; import com.sagocloud.ntworker.mqtt.ActionEventType; import com.sagocloud.ntworker.mqtt.EventType; import com.sagocloud.ntworker.mqtt.bean.MQTTConnectUserEntity; import com.sagocloud.ntworker.mqtt.bean.MQTTMessage; import com.sagocloud.ntworker.mqtt.bean.MqttConnectPoint; import com.sagocloud.ntworker.mqtt.event.MQTTClientActionEvent; import com.sagocloud.ntworker.mqtt.event.MQTTTransferEvent; import com.sagocloud.ntworker.mqtt.event.MQTTMessageEvent; import com.sagocloud.ntworker.mqtt.event.MQTTStateEvent; import com.sagocloud.ntworker.mqtt.event.MQTTTraceEvent; import org.eclipse.paho.android.service.MqttAndroidClient; import org.eclipse.paho.android.service.MqttTraceHandler; import org.eclipse.paho.client.mqttv3.DisconnectedBufferOptions; import org.eclipse.paho.client.mqttv3.IMqttActionListener; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.IMqttToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Locale; import java.util.concurrent.TimeUnit; import io.reactivex.Observable; import io.reactivex.disposables.CompositeDisposable; import io.reactivex.disposables.Disposable; public class MQTTService extends Service {public final static String CONN = "CON_MQTT_CF"; private MqttAndroidClient mqttAndroidClient; private MQTTMessageEvent mQTTConnectEvent; private MQTTConnectUserEntity connectPoint; private MqttConnectOptions mMqttConnectOptions; private MqttCallback mqttCallback = new MqttCallback() { @Override public void connectionLost(Throwable cause) { mQTTConnectEvent = new MQTTMessageEvent(); mQTTConnectEvent.setType(EventType.connectionLost); Loger.e("?connectionLost:", cause.getMessage()); RxEventBus.post(mQTTConnectEvent); RxEventBus.post(new MQTTStateEvent(App.mqttIsConnected = false)); }@Override public void messageArrived(String topic, MqttMessage message) throws Exception { mQTTConnectEvent = new MQTTMessageEvent(); mQTTConnectEvent.setType(EventType.messageArrived); mQTTConnectEvent.setTopic(topic); mQTTConnectEvent.setMessage(message); Loger.e("??messageArrived:", topic); RxEventBus.post(mQTTConnectEvent); }@Override public void deliveryComplete(IMqttDeliveryToken token) { mQTTConnectEvent = new MQTTMessageEvent(); mQTTConnectEvent.setType(EventType.deliveryComplete); try { mQTTConnectEvent.setTopic(StringUtils.join(token.getTopics(), ",")); mQTTConnectEvent.setMessage(token.getMessage()); } catch (MqttException e) { e.printStackTrace(); } Loger.e("?deliveryComplete:", token.toString()); RxEventBus.post(mQTTConnectEvent); } }; private IMqttActionListener iMqttActionListener = new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { RxEventBus.post(new MQTTTransferEvent(asyncActionToken, null)); Loger.e("?onSuccess:", "" + Arrays.toString(asyncActionToken.getTopics())); App.mqttIsConnected = true; RxEventBus.post(new MQTTStateEvent(true)); DisconnectedBufferOptions disconnectedBufferOptions = new DisconnectedBufferOptions(); disconnectedBufferOptions.setBufferEnabled(true); disconnectedBufferOptions.setBufferSize(100); disconnectedBufferOptions.setPersistBuffer(false); disconnectedBufferOptions.setDeleteOldestMessages(false); mqttAndroidClient.setBufferOpts(disconnectedBufferOptions); }@Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { RxEventBus.post(new MQTTTransferEvent(asyncActionToken, exception.getMessage())); Loger.e("?onFailure:", "" + exception.getMessage()); App.mqttIsConnected = false; RxEventBus.post(new MQTTStateEvent(false)); } }; private CompositeDisposable subscription; private MqttTraceHandler traceCallback = new MqttTraceHandler() { @Override public void traceDebug(String tag, String message) { Loger.e("?traceDebug:" + tag, "" + message); //LiveDataBus.post(MQTTTraceEvent.class,new MQTTTraceEvent(MQTTTraceEvent.Type.DEBUG, tag, message)); RxEventBus.post(new MQTTTraceEvent(MQTTTraceEvent.Type.DEBUG, tag, message)); }@Override public void traceError(String tag, String message) { Loger.e("?traceError:" + tag, "" + message); //LiveDataBus.post(MQTTTraceEvent.class,new MQTTTraceEvent(MQTTTraceEvent.Type.ERROR, tag, message)); RxEventBus.post(new MQTTTraceEvent(MQTTTraceEvent.Type.ERROR, tag, message)); }@Override public void traceException(String tag, String message, Exception e) { Loger.e("?traceException:" + tag, "" + message + e.getMessage()); RxEventBus.post(new MQTTTraceEvent(MQTTTraceEvent.Type.EXCEPTION, tag, message)); //LiveDataBus.post(MQTTTraceEvent.class,new MQTTTraceEvent(MQTTTraceEvent.Type.EXCEPTION, tag, message)); } }; private Disposable actSubscription; private Disposable actTimerSubscription; public static void startService(Context context, MQTTConnectUserEntity point) { Intent service = new Intent(); service.setClass(context, MQTTService.class); service.putExtra(CONN, point); context.startService(service); }private void $prepareActionHandler() { if (subscription == null) { subscription = new CompositeDisposable(); } subscription.clear(); if (actSubscription != null && actSubscription.isDisposed()) { actSubscription.dispose(); } if (actTimerSubscription != null && actTimerSubscription.isDisposed()) { actTimerSubscription.dispose(); } actSubscription = RxEventBus.subscribeIOEvent( MQTTClientActionEvent.class, event -> { ActionEventType type = event.getEventType(); Object payload = event.getPayload(); switch (type) { case connect: connect(); break; case publish: if (payload instanceof MQTTMessage) { publish((MQTTMessage) payload); } break; case subscribe: if (payload instanceof MQTTMessage) { subscribe((MQTTMessage) payload); } break; case unsubscribe: if (payload instanceof MQTTMessage) { unsubscribe((MQTTMessage) payload); } break; case unsubscribe_all: if (payload instanceof String[]) { unsubscribeAll((String[]) payload); } break; case close: disconnect(); mqttAndroidClient = null; App.mqttIsConnected = false; RxEventBus.post(new MQTTStateEvent(false)); stopSelf(); break; } }, error -> { Loger.d("error", error.getMessage()); } ); actTimerSubscription = Observable.interval(2000, TimeUnit.MILLISECONDS) .subscribe((i) -> { App.mqttIsConnected = mqttAndroidClient != null && mqttAndroidClient.isConnected(); RxEventBus.post(new MQTTStateEvent(App.mqttIsConnected)); }); subscription.add(actSubscription); subscription.add(actTimerSubscription); }@Override public void onDestroy() { disconnect(); RxEventBus.unsubscribeEvent(subscription); super.onDestroy(); }@Nullable @Override public IBinder onBind(Intent intent) { return null; }@Override public int onStartCommand(Intent intent, int flags, int startId) { if (intent != null) { connectPoint = intent.getParcelableExtra(CONN); if (connectPoint != null) { $prepareActionHandler(); connect(); } else { if (mqttAndroidClient != null && mqttAndroidClient.isConnected()) { connect(); } } } return super.onStartCommand(intent, flags, startId); }private void $prepared(MqttConnectPoint connectPoint) { String serverURI = String.format(Locale.ENGLISH, "%s://%s:%s", connectPoint.isUseSSL() ? "ssl" : "tcp", connectPoint.getHost(), connectPoint.getPort()); if (mqttAndroidClient != null) { disconnect(); } mqttAndroidClient = new MqttAndroidClient(this, serverURI, connectPoint.getClientId()); mqttAndroidClient.setCallback(mqttCallback); //设置监听订阅消息的回调 mqttAndroidClient.setTraceEnabled(true); mqttAndroidClient.setTraceCallback(traceCallback); mMqttConnectOptions = new MqttConnectOptions(); mMqttConnectOptions.setMqttVersion(connectPoint.getVersion()); mMqttConnectOptions.setMaxInflight(connectPoint.getMaxInflight()); mMqttConnectOptions.setAutomaticReconnect(connectPoint.isAutoReconnect()); mMqttConnectOptions.setCleanSession(connectPoint.isClearSession()); //设置是否清除缓存 mMqttConnectOptions.setConnectionTimeout(connectPoint.getConnectTimeout()); //设置超时时间,单位:秒 mMqttConnectOptions.setKeepAliveInterval(connectPoint.getTickTime()); //设置心跳包发送间隔,单位:秒 if (RegexHelper.isAllNotEmpty(connectPoint.getUserName(), connectPoint.getUserPasswort())) { mMqttConnectOptions.setUserName(connectPoint.getUserName()); //设置用户名 mMqttConnectOptions.setPassword(connectPoint.getUserPasswort().toCharArray()); //设置密码 } if (connectPoint.isUseSSL() && connectPoint.getSslProperties() != null) { mMqttConnectOptions.setSSLProperties(connectPoint.getSslProperties()); } if (RegexHelper.isNotEmpty(connectPoint.getLwt())) { mMqttConnectOptions.setWill(connectPoint.getLwt().getTopic(), connectPoint.getLwt().getMessage().getBytes(), connectPoint.getLwt().getQos(), connectPoint.getLwt().isRetained()); } }private void connect() { if (mqttAndroidClient == null) { $prepared(connectPoint); } if (!mqttAndroidClient.isConnected() && NetSuit.checkEnable(this)) { try { mqttAndroidClient.connect(mMqttConnectOptions, null, iMqttActionListener); } catch (MqttException e) { e.printStackTrace(); } }}private void disconnect() { try { if (mqttAndroidClient == null) return; mqttAndroidClient.unregisterResources(); mqttAndroidClient.disconnect(); mqttAndroidClient.close(); } catch (MqttException e) { e.printStackTrace(); } finally { mqttAndroidClient = null; }}private void subscribe(MQTTMessage subscribe) { try { if (mqttAndroidClient == null || subscribe == null || subscribe.getTopic() == null) return; MqttMessage.validateQos(subscribe.getQos()); List sub = connectPoint.getSubTopics(); if (sub == null) { sub = new ArrayList<>(); } Long count = Stream.of(sub) .filter(item -> subscribe.getTopic().equalsIgnoreCase(item.getTopic())) .collect(Collectors.counting()); if (count > 0) { return; } sub.add(subscribe); connectPoint.setSubTopics(sub); mqttAndroidClient.subscribe(subscribe.getTopic(), subscribe.getQos()); } catch (IllegalArgumentException e) { e.printStackTrace(); } catch (MqttException e) { e.printStackTrace(); } }private void subscribeAll(String[] topics, int[] qos) { if (RegexHelper.isAnyEmpty(topics, qos)) return; if (mqttAndroidClient == null) return; if (topics.length != qos.length) return; try { mqttAndroidClient.subscribe(topics, qos); } catch (MqttException e) { e.printStackTrace(); } }private void unsubscribe(MQTTMessage subscribe) { try { if (mqttAndroidClient == null || connectPoint == null) return; List sub = connectPoint.getSubTopics(); if (sub != null) { List filtered = Stream.of(sub) .filter(item -> !subscribe.getTopic().equalsIgnoreCase(item.getTopic())) .collect(Collectors.toList()); connectPoint.setSubTopics(filtered); } mqttAndroidClient.unsubscribe(subscribe.getTopic()); } catch (MqttException e) { e.printStackTrace(); } }private void unsubscribeAll(String[] topics) { try { if (mqttAndroidClient == null) return; if (topics == null) mqttAndroidClient.unsubscribe("#"); else { mqttAndroidClient.unsubscribe(topics); } if (connectPoint != null) { connectPoint.setSubTopics(null); } } catch (MqttException e) { e.printStackTrace(); } }private void publish(MQTTMessage subscribe) { try { MqttMessage.validateQos(subscribe.getQos()); mqttAndroidClient.publish(subscribe.getTopic(), subscribe.getMessage().getBytes(), subscribe.getQos(), subscribe.isRetained()); } catch (IllegalArgumentException e) { e.printStackTrace(); } catch (MqttException e) { e.printStackTrace(); } }}

【IoT|Android使用的MQTT客户端】

    推荐阅读