Commit 7b98e668 authored by 马乐's avatar 马乐

1.增加TCP通信功能

2.增加app设置参数及设备配置
parent dec5de28
package com.waytous.anticollision.concurrent
import com.blankj.utilcode.util.LogUtils
import com.waytous.anticollision.BuildConfig
import com.waytous.anticollision.tcp.Error
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.locks.Condition
import java.util.concurrent.locks.Lock
class Signal(private val lock: Lock, private val count:Int) {
private val condition: Condition = lock.newCondition()
val flag by lazy {
AtomicReference(Error.NOError)
}
val retryCount:AtomicInteger = AtomicInteger(count)
val mResult by lazy {
AtomicReference(Result.Timeout)
}
enum class Result(val value:Int){
Invalid(-1),
Success(0),
Timeout(1),
Cancelled(2),
}
fun taskWait(time:Long){
lock.lock()
try {
condition.await()
} catch (e:InterruptedException) {
if (BuildConfig.DEBUG) {
LogUtils.e("send data error:${e.message}")
} else {
LogUtils.file("send data error:${e.message}")
}
}finally {
lock.unlock()
}
}
}
\ No newline at end of file
package com.waytous.anticollision.config
import com.blankj.utilcode.util.DeviceUtils
import com.waytous.anticollision.utils.PreferenceDelegate
import io.github.toggery.jt808.messagebody.B0100
/**
* 设备配置参数
* */
object DeviceConfig {
/**
* 省份ID
* */
var province: Int by PreferenceDelegate("province", 11)
/**
* 城市ID
* */
var city: Int by PreferenceDelegate("city", 0)
/**
* 车牌颜色
* */
var plateColor: Int by PreferenceDelegate("plateColor", B0100.PLATE_COLOR_YELLOW)
/**
* 设备制造商
* */
var maker: String by PreferenceDelegate("maker", DeviceUtils.getManufacturer())
/**
* 手机号码
* */
var phoneNumber: String by PreferenceDelegate("phone", "")
object HostConfig {
/**
* 服务器地址
* */
var host: String by PreferenceDelegate("host", "")
/**
* 服务器端口
* */
var port: Int by PreferenceDelegate("port", 0)
}
}
\ No newline at end of file
package com.waytous.anticollision.config
import com.waytous.anticollision.utils.PreferenceDelegate
/**
* 设置
* */
object Settings {
/**
* 司机姓名
* */
var driverName: String = ""
@Synchronized set
/**
* 车牌号
* */
var plateNo: String = ""
@Synchronized set
/**
* 设备唯一识别码
* */
var deviceId: String by PreferenceDelegate("deviceId", "")
/**
* 是否已注册
* */
var isRegistered: Boolean by PreferenceDelegate("isRegistered", false)
/**
* 设备注册鉴权码
* */
var token: String by PreferenceDelegate("token", "")
}
\ No newline at end of file
package com.waytous.anticollision.tcp
import io.netty.buffer.ByteBuf
interface ConnectListener {
/**
* 接收到响应数据
* @param buf
* */
fun onDataReceived(buf: ByteBuf)
/**
* 链接服务器成功
* */
fun onConnect()
/**
* 和服务器断开链接
* */
fun onDisconnect(error: Error)
}
\ No newline at end of file
package com.waytous.anticollision.tcp
/**
* tcp通信错误定义
* */
enum class Error {
NOError,
IOError,
StreamClosed,
ConnectionRefused,
Timeout,
NotConnected,
UserDisconnected,
InvalidParam,
ServerUnknownError,
JT808EncodeError,
JT808DecodeError
}
\ No newline at end of file
package com.waytous.anticollision.tcp
import com.blankj.utilcode.util.DeviceUtils
import com.blankj.utilcode.util.LogUtils
import com.waytous.anticollision.BuildConfig
import com.waytous.anticollision.config.DeviceConfig
import com.waytous.anticollision.config.Settings
import com.waytous.anticollision.utils.NamedThreadFactory
import io.github.toggery.jt808.codec.*
import io.github.toggery.jt808.messagebody.B0100
import io.github.toggery.jt808.messagebody.B0102
import io.github.toggery.jt808.messagebody.B8001
import io.github.toggery.jt808.messagebody.HexUtil
import io.netty.buffer.ByteBuf
import io.netty.buffer.UnpooledByteBufAllocator
import io.netty.util.DefaultAttributeMap
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicReference
/**
* 最大尝试链接次数
* */
const val CONNECT_MAX_RETRY = 3
/**
* 设备状态
* @author male
* */
enum class DeviceStatus {
/**
* 未鉴权
* */
UnAuthenticate,
/**
* 正在鉴权
* */
Authenticating,
/**
* 已鉴权
* */
Authenticated
}
/**
* jt808网络会话管理
* @author male
* */
class Session : ConnectListener, SyncMessageListener<Codec<*>> {
/**
* 消息内容解析器
* */
private val mParser by lazy {
SyncParser(this)
}
/**
* 开启新线程,将接收到的字节流交给消息内容解析器解析
* */
private val parseExecutor by lazy {
Executors.newSingleThreadExecutor(NamedThreadFactory("Parser"))
}
/**
* 设备状态
* */
private val deviceStatus by lazy {
AtomicReference(DeviceStatus.UnAuthenticate)
}
/**
* tcp链接管理者
* */
private val tcpManager by lazy {
TcpManager(this, 2)
}
private val buffs by lazy {
mutableListOf<ByteBuf>()
}
private val attributeMap by lazy {
DefaultAttributeMap()
}
private val sheduler by lazy {
Executors.newScheduledThreadPool(1,NamedThreadFactory("Heartbeat"))
}
override fun onDataReceived(buf: ByteBuf) {
parseExecutor.execute { mParser.parse(buf) }
}
override fun onConnect() {
if (deviceStatus.get() == DeviceStatus.UnAuthenticate) {
tcpManager.disconnect()
return
}
var retryTimes = 0
var success = false
val isRegistered= Settings.deviceId == DeviceUtils.getUniqueDeviceId() && Settings.token.isNotEmpty() && Settings.isRegistered
if (isRegistered) {
do {
success = doAuthenticate()
retryTimes++
} while (retryTimes < CONNECT_MAX_RETRY && success)
if (!success) {
handleDisconnect(Error.JT808EncodeError)
}
} else {
do {
success = sendDeviceSignUpMessage()
retryTimes++
} while (retryTimes < CONNECT_MAX_RETRY && success)
if (!success) {
handleDisconnect(Error.JT808EncodeError)
}
}
}
private fun handleDisconnect(error: Error){
when(error){
Error.IOError,Error.StreamClosed,Error.Timeout,Error.NotConnected ->{
}
else -> {}
}
}
fun login(){
if (Settings.isRegistered) {
doAuthenticate()
} else {
sendDeviceSignUpMessage()
}
}
/**
* 设备鉴权
* */
private fun doAuthenticate():Boolean{
val b0102 = B0102().apply {
token = Settings.token
}
val request = Message.of(0x0100, b0102).apply {
simNo = DeviceConfig.phoneNumber
}
Message.encode(
request, MessageMetadata.inbounds(), attributeMap,
UnpooledByteBufAllocator.DEFAULT::buffer, buffs::add
)
return doSendMessage("Authenticate Message")
}
/**
* 发送设备注册消息
* */
private fun sendDeviceSignUpMessage(): Boolean {
Settings.isRegistered = false
Settings.deviceId = ""
val b0100 = B0100().apply {
province = DeviceConfig.province
city = DeviceConfig.city
model = DeviceUtils.getModel()
plateColor = B0100.PLATE_COLOR_YELLOW
maker = DeviceUtils.getManufacturer()
id = DeviceUtils.getUniqueDeviceId()
}
val request = Message.of(0x0100, b0100).apply {
simNo = DeviceConfig.phoneNumber
}
Message.encode(
request, MessageMetadata.inbounds(), attributeMap,
UnpooledByteBufAllocator.DEFAULT::buffer, buffs::add
)
return doSendMessage("SignUp Message")
}
/**
* 开始心跳
* */
private fun startHeartbeat(){
}
/**
* 结束心跳
* */
private fun stopHeartbeat(){
}
/**
* 发送心跳
* */
private fun sendPing(){
val request = Message.of(0x0002).apply {
simNo = DeviceConfig.phoneNumber
}
Message.encode(
request, MessageMetadata.inbounds(), attributeMap,
UnpooledByteBufAllocator.DEFAULT::buffer, buffs::add
)
doSendMessage("HeartBeat message")
}
private fun doSendMessage(messageType:String) = if (buffs.isNotEmpty()) {
val byteArray = ByteArray(buffs[0].readableBytes())
buffs[0].readBytes(byteArray)
if (BuildConfig.DEBUG) {
LogUtils.file("sending $messageType:${HexUtil.dump(byteArray)}")
} else {
LogUtils.d("sending $messageType:${HexUtil.dump(byteArray)}")
}
tcpManager.send(byteArray)
buffs.removeAt(0)
true
} else {
if (BuildConfig.DEBUG) {
LogUtils.d("sending $messageType:buffs is empty,Message encode failed!")
} else {
LogUtils.file("sending $messageType:buffs is empty,Message encode failed!")
}
false
}
override fun onDisconnect(error: Error) {
TODO("Not yet implemented")
}
override fun onRegistered(data: Codec<*>) {
Settings.deviceId = DeviceUtils.getUniqueDeviceId()
Settings.isRegistered = true
Settings.token = (data as B8100Codec).newInstance().token
doAuthenticate()
}
override fun onAuthenticated(data: Codec<*>) {
val codec = (data as B8001Codec).newInstance()
when(val code = codec.result){
B8001.RESULT_SUCCESSFUL->{
deviceStatus.set(DeviceStatus.Authenticated)
}
}
}
override fun onCommonResponse(data: Codec<*>) {
val codec = (data as B8001Codec).newInstance()
when(codec.result){
B8001.RESULT_SUCCESSFUL->{
}
}
}
}
\ No newline at end of file
package com.waytous.anticollision.tcp
/**
* 消息同步监听器
* */
interface SyncMessageListener<T> {
/**
* 设备注册
* @param data
* */
fun onRegistered(data:T)
/**
* 设备鉴权
* @param data
* */
fun onAuthenticated(data:T)
/**
* 平台通用应答
* @param data
* */
fun onCommonResponse(data:T)
}
\ No newline at end of file
package com.waytous.anticollision.tcp
import com.blankj.utilcode.util.LogUtils
import com.waytous.anticollision.BuildConfig
import io.github.toggery.jt808.codec.*
import io.netty.buffer.ByteBuf
import io.netty.util.DefaultAttributeMap
import io.netty.util.ReferenceCountUtil
/**
* 应答消息解析器,将JT808编解码器解码后的消息交给该类进行处理&分类
*
* @author male
* */
class SyncParser(private val syncMessageListener: SyncMessageListener<Codec<*>>) {
/**
* 解析字节流
*
* */
fun parse(buf: ByteBuf){
try {
val message: Message<Codec<*>> = Message.decode(buf, MessageMetadata.outbounds(), DefaultAttributeMap())
when(val codec:Codec<*> = message.body){
is B8001Codec -> syncMessageListener.onCommonResponse(codec)
is B8100Codec -> syncMessageListener.onRegistered(codec)
}
LogUtils.d(message.toString())
} catch (e: Exception) {
if (BuildConfig.DEBUG) {
LogUtils.e("parse received data error:${e.message}")
} else {
LogUtils.file("parse received data error:${e.message}")
}
}finally {
ReferenceCountUtil.release(buf)
}
}
}
\ No newline at end of file
package com.waytous.anticollision.tcp
import com.blankj.utilcode.util.LogUtils
import com.waytous.anticollision.BuildConfig
import com.waytous.anticollision.utils.NamedThreadFactory
import io.netty.buffer.UnpooledByteBufAllocator
import java.io.IOException
import java.io.InputStream
import java.io.OutputStream
import java.net.InetSocketAddress
import java.net.Socket
import java.net.SocketTimeoutException
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.locks.ReentrantLock
/**
* 缓冲区8M
* */
const val BUFFER_SIZE = 8192
/**
* 默认超时时间
* */
const val DEFAULT_TIMEOUT = 32
/**
* 每次希望读取的数据长度
* */
const val READ_SIZE = 1024
/**
* 链接状态
* @author male
* */
enum class ConnectStatus {
/**
* 链接断开
* */
Disconnected,
/**
* 正在链接
* */
Connecting,
/**
* 已经链接到服务器
* */
Connected
}
/**
* tcp链接管理类
* @param connectListener
* @param receiveInterval
* @author male
* */
internal class TcpManager(
private val connectListener: ConnectListener,
private val receiveInterval: Int = 2
) {
/**
* 接收线程
* */
private val receiveExecutor by lazy {
Executors.newSingleThreadExecutor(NamedThreadFactory("Receive"))
}
/**
* 发送线程
* */
private val sendExecutors: ExecutorService by lazy {
Executors.newCachedThreadPool(NamedThreadFactory("Send"))
}
/**
* 一条tcp链接
* */
private val connection by lazy {
Connection()
}
private val mCancel by lazy {
AtomicBoolean(true)
}
/**
* access tcp server
* @param host
* @param port
* @param timeout
* */
fun connect(host: String, port: Int, timeout: Int = 40) {
receiveExecutor.execute {
val errCode = connection.connect(host, port, timeout)
if (errCode != Error.NOError) {
connectListener.onDisconnect(errCode)
return@execute
}
connectListener.onConnect()
mCancel.set(false)
receive(receiveInterval)
}
}
/**
* disconnect from tcp server
* @param error
* */
fun disconnect(error: Error = Error.UserDisconnected) {
if (connection.connectStatus.get() == ConnectStatus.Disconnected) {
return
}
receiveExecutor.execute {
connection.disconnect()
mCancel.set(true)
connectListener.onDisconnect(error)
}
}
/**
* 发送消息
* @param data
* */
fun send(data: ByteArray) {
if (connection.connectStatus.get() == ConnectStatus.Connected) {
sendExecutors.execute { connection.send(data) }
}
}
/**
* 发送消息
* @param timeout
* */
private fun receive(timeout: Int) {
if (mCancel.get()) {
return
}
val (errCode, length) = connection.receive()
if (errCode == Error.NOError) {
val buf = UnpooledByteBufAllocator.DEFAULT.buffer(length)
buf.writeBytes(connection.buffer, 0, length)
connectListener.onDataReceived(buf)
connection.buffer.fill(0, 0, length - 1)
receiveExecutor.execute { receive(timeout) }
} else {
mCancel.set(true)
connectListener.onDisconnect(errCode)
}
}
data class ReceivedResult(val error: Error, val length: Int)
/**
* tcp链接
* */
class Connection {
private val TAG = "Connection"
/**
* 接收线程锁
* */
private val receiveLock by lazy {
ReentrantLock()
}
/**
* 发送线程锁
* */
private val sendLock by lazy {
ReentrantLock()
}
private lateinit var socket: Socket
/**
* 数据接收流
* */
private lateinit var inputStream: InputStream
/**
* 数据发送流
* */
private lateinit var outputStream: OutputStream
/**
* 链接状态
* */
val connectStatus by lazy {
AtomicReference(ConnectStatus.Disconnected)
}
/**
* 接收缓冲区
* */
val buffer by lazy {
ByteArray(BUFFER_SIZE)
}
/**
* 链接到指定服务器
* @param host
* @param port
* @param timeout
* @return Error
* */
fun connect(host: String, port: Int, timeout: Int, receiveInterval: Int = 2): Error = try {
receiveLock.lock()
sendLock.lock()
var errorCode: Error = Error.NOError
if (socket != null && socket.isConnected && connectStatus.get() > ConnectStatus.Disconnected) {
errorCode = Error.NOError
} else if (socket != null) {
closeSocket()
}
connectStatus.set(ConnectStatus.Connecting)
socket = Socket().apply {
soTimeout = receiveInterval
keepAlive = true
receiveBufferSize = BUFFER_SIZE
}
socket.connect(
InetSocketAddress(host, port),
if (timeout > 0) timeout else DEFAULT_TIMEOUT
)
inputStream = socket.getInputStream()
outputStream = socket.getOutputStream()
connectStatus.set(ConnectStatus.Connected)
errorCode
} catch (e: IOException) {
connectStatus.set(ConnectStatus.Disconnected)
Error.IOError
} catch (e: SocketTimeoutException) {
connectStatus.set(ConnectStatus.Disconnected)
Error.Timeout
} catch (e: IllegalArgumentException) {
connectStatus.set(ConnectStatus.Disconnected)
Error.InvalidParam
} catch (e: Exception) {
connectStatus.set(ConnectStatus.Disconnected)
Error.ServerUnknownError
} finally {
sendLock.unlock()
receiveLock.unlock()
}
/**
* 发送数据
* @param data
* @return Error
* */
fun send(data: ByteArray): Error {
if (data.isEmpty()) return Error.NOError
try {
sendLock.lock()
outputStream.write(data)
outputStream.flush()
} catch (e: IOException) {
if (BuildConfig.DEBUG) {
LogUtils.e("send data error:${e.message}")
} else {
LogUtils.file("send data error:${e.message}")
}
} catch (e: Exception) {
if (BuildConfig.DEBUG) {
LogUtils.e("send data error:${e.message}")
} else {
LogUtils.file("send data error:${e.message}")
}
} finally {
sendLock.unlock()
}
return Error.NOError
}
private fun closeSocket() {
socket.close()
}
/**
* 接收数据
* @return ReceivedResult
* */
fun receive(): ReceivedResult {
try {
receiveLock.lock()
if (connectStatus.get() != ConnectStatus.Connected) return ReceivedResult(
Error.NotConnected,
0
)
if (inputStream.available() < 0) return ReceivedResult(Error.NOError, 0)
val len = inputStream.read(buffer)
return if (len > 0) {
ReceivedResult(Error.NOError, len)
} else {
if (len == 0) {
connectStatus.set(ConnectStatus.Disconnected)
ReceivedResult(Error.StreamClosed, len)
} else {
cleanup()
connectStatus.set(ConnectStatus.Disconnected)
ReceivedResult(Error.IOError, len)
}
}
} catch (e: IOException) {
connectStatus.set(ConnectStatus.Disconnected)
return ReceivedResult(Error.IOError, -1)
} catch (e: IllegalArgumentException) {
connectStatus.set(ConnectStatus.Disconnected)
return ReceivedResult(Error.InvalidParam, 0)
} catch (e: Exception) {
connectStatus.set(ConnectStatus.Disconnected)
return ReceivedResult(Error.ServerUnknownError, 0)
} finally {
receiveLock.unlock()
}
}
/**
* 断开链接
* */
fun disconnect() = cleanup()
/**
* 断开链接
* */
private fun cleanup() = try {
receiveLock.lock()
sendLock.lock()
closeSocket()
connectStatus.set(ConnectStatus.Disconnected)
} catch (e: Exception) {
if (BuildConfig.DEBUG) {
LogUtils.e("cleanup error:${e.message}")
} else {
LogUtils.file("cleanup error:${e.message}")
}
} finally {
sendLock.unlock()
receiveLock.unlock()
}
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment