package com.netopsun.w8028devices;

import com.netopsun.deviceshub.base.RxTxCommunicator;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.schedulers.Schedulers;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;

/* loaded from: classes.dex */
public class W8028RxTxCommunicator extends RxTxCommunicator {
    volatile boolean isConnected;
    volatile long lastSendSuccessTimes;
    private OutputStream outputStream;
    private Disposable receiveTask;
    private Disposable sendBeatTask;
    Socket socket;
    final W8028Devices w8028Devices;

    public W8028RxTxCommunicator(W8028Devices w8028Devices) {
        super(w8028Devices);
        this.socket = new Socket();
        this.lastSendSuccessTimes = 0L;
        this.w8028Devices = w8028Devices;
    }

    @Override // com.netopsun.deviceshub.base.RxTxCommunicator
    public void connect() {
        super.connect();
    }

    @Override // com.netopsun.deviceshub.base.RxTxCommunicator
    public int connectInternal() {
        if (this.isConnected) {
            return 0;
        }
        Socket socket = this.socket;
        if (socket != null) {
            try {
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        try {
            InetSocketAddress inetSocketAddress = new InetSocketAddress(this.w8028Devices.getDevicesIP(), this.w8028Devices.getRxtxPort());
            Socket socket2 = new Socket();
            this.socket = socket2;
            socket2.connect(inetSocketAddress, 5000);
            this.lastSendSuccessTimes = System.currentTimeMillis();
            this.outputStream = this.socket.getOutputStream();
            final InputStream inputStream = this.socket.getInputStream();
            Disposable disposable = this.receiveTask;
            if (disposable != null) {
                disposable.dispose();
            }
            this.receiveTask = Observable.create(new ObservableOnSubscribe<Object>() { // from class: com.netopsun.w8028devices.W8028RxTxCommunicator.1
                @Override // io.reactivex.ObservableOnSubscribe
                public void subscribe(ObservableEmitter<Object> observableEmitter) throws Exception {
                    byte[] bArr = new byte[1024];
                    while (!observableEmitter.isDisposed()) {
                        try {
                            int read = inputStream.read(bArr);
                            if (read > 0) {
                                byte[] copyOf = Arrays.copyOf(bArr, read);
                                if (W8028RxTxCommunicator.this.onReceiveCallback != null) {
                                    W8028RxTxCommunicator.this.onReceiveCallback.onReceive(copyOf);
                                }
                            }
                        } catch (Exception e2) {
                            W8028RxTxCommunicator.this.isConnected = false;
                            observableEmitter.onComplete();
                            e2.printStackTrace();
                        }
                    }
                }
            }).subscribeOn(Schedulers.io()).subscribe();
            this.isConnected = true;
            return 0;
        } catch (IOException e2) {
            e2.printStackTrace();
            return -1;
        }
    }

    @Override // com.netopsun.deviceshub.base.RxTxCommunicator
    public void disconnect() {
        this.lastSendSuccessTimes = 0L;
        Disposable disposable = this.sendBeatTask;
        if (disposable != null) {
            disposable.dispose();
        }
        super.disconnect();
    }

    @Override // com.netopsun.deviceshub.base.RxTxCommunicator
    public int disconnectInternal() {
        Disposable disposable = this.receiveTask;
        if (disposable != null) {
            disposable.dispose();
        }
        Socket socket = this.socket;
        if (socket != null) {
            try {
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        this.isConnected = false;
        return 0;
    }

    @Override // com.netopsun.deviceshub.base.RxTxCommunicator
    public int interruptSend() {
        return 0;
    }

    @Override // com.netopsun.deviceshub.base.RxTxCommunicator
    public boolean isConnected() {
        return this.isConnected;
    }

    @Override // com.netopsun.deviceshub.base.RxTxCommunicator
    public synchronized int send(byte[] bArr) {
        OutputStream outputStream = this.outputStream;
        if (outputStream == null) {
            return -1;
        }
        try {
            outputStream.write(bArr);
            this.outputStream.flush();
            this.lastSendSuccessTimes = System.currentTimeMillis();
            return 0;
        } catch (IOException e) {
            e.printStackTrace();
            if (this.lastSendSuccessTimes != 0 && this.autoReconnect && System.currentTimeMillis() - this.lastSendSuccessTimes > 3000) {
                this.w8028Devices.getConnectHandler().notifyReconnectRxTx();
                this.lastSendSuccessTimes = System.currentTimeMillis();
            }
            return -1;
        }
    }

    @Override // com.netopsun.deviceshub.base.RxTxCommunicator
    public boolean startSendHeartBeatPackage(int i, final byte[] bArr) {
        Disposable disposable = this.sendBeatTask;
        if (disposable != null) {
            disposable.dispose();
        }
        this.sendBeatTask = Observable.interval(i, TimeUnit.MILLISECONDS).subscribe(new Consumer<Long>() { // from class: com.netopsun.w8028devices.W8028RxTxCommunicator.2
            @Override // io.reactivex.functions.Consumer
            public void accept(Long l) throws Exception {
                W8028RxTxCommunicator.this.send(bArr);
            }
        });
        return true;
    }

    @Override // com.netopsun.deviceshub.base.RxTxCommunicator
    public void stopSendHeartBeatPackage() {
        Disposable disposable = this.sendBeatTask;
        if (disposable != null) {
            disposable.dispose();
        }
    }
}
