Commit 02a36148 authored by 马乐's avatar 马乐

1.完善登录流程

parent 7b98e668
package com.waytous.anticollision.concurrent
import com.blankj.utilcode.util.LogUtils
import com.waytous.anticollision.BuildConfig
import com.waytous.anticollision.tcp.Error
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.locks.Condition
import java.util.concurrent.locks.Lock
class Signal(private val lock: Lock, private val count:Int) {
private val condition: Condition = lock.newCondition()
val flag by lazy {
AtomicReference(Error.NOError)
}
val retryCount:AtomicInteger = AtomicInteger(count)
val mResult by lazy {
AtomicReference(Result.Timeout)
}
enum class Result(val value:Int){
Invalid(-1),
Success(0),
Timeout(1),
Cancelled(2),
}
fun taskWait(time:Long){
lock.lock()
try {
condition.await()
} catch (e:InterruptedException) {
if (BuildConfig.DEBUG) {
LogUtils.e("send data error:${e.message}")
} else {
LogUtils.file("send data error:${e.message}")
}
}finally {
lock.unlock()
}
}
}
\ No newline at end of file
package com.waytous.anticollision.listener
import com.waytous.anticollision.tcp.Error
import com.waytous.anticollision.utils.ConnectStatus
interface SessionListener {
/**
* 连接状态发生变化
* */
fun onConnectStatusChanged(status: ConnectStatus)
/**
* 用户登录成功
* */
fun onUserSignIn()
/**
* 用户登录失败
* */
fun onUserSignOut(error: Error)
}
\ No newline at end of file
package com.waytous.anticollision.tcp
/**
* tcp通信错误定义
* */
enum class Error {
NOError,
IOError,
StreamClosed,
ConnectionRefused,
Timeout,
NotConnected,
UserDisconnected,
InvalidParam,
ServerUnknownError,
JT808EncodeError,
JT808DecodeError
}
\ No newline at end of file
package com.waytous.anticollision.tcp
import com.blankj.utilcode.util.DeviceUtils
import com.blankj.utilcode.util.LogUtils
import com.waytous.anticollision.BuildConfig
import com.waytous.anticollision.config.DeviceConfig
import com.waytous.anticollision.config.Settings
import com.waytous.anticollision.listener.SessionListener
import com.waytous.anticollision.utils.NamedThreadFactory
import com.waytous.anticollision.utils.SignInStatus
import com.waytous.anticollision.utils.logd
import com.waytous.anticollision.utils.loge
import io.github.toggery.jt808.codec.*
import io.github.toggery.jt808.messagebody.B0100
import io.github.toggery.jt808.messagebody.B0102
......@@ -15,7 +17,10 @@ import io.netty.buffer.ByteBuf
import io.netty.buffer.UnpooledByteBufAllocator
import io.netty.util.DefaultAttributeMap
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.locks.ReentrantLock
/**
* 最大尝试链接次数
......@@ -26,11 +31,23 @@ const val CONNECT_MAX_RETRY = 3
* 设备状态
* @author male
* */
enum class DeviceStatus {
internal enum class DeviceStatus {
/**
* 未注册
* */
Unregistered,
/**
* 注册中
* */
Registering,
/**
* 已册中
* */
Registered,
/**
* 未鉴权
* */
UnAuthenticate,
UnAuthenticated,
/**
* 正在鉴权
......@@ -43,11 +60,21 @@ enum class DeviceStatus {
Authenticated
}
internal class Schedule{
companion object{
const val tcpReadIntervalSecs:Long = 2 // 2 seconds
const val tcpConnectTimeoutSecs = 40 // 40 seconds
const val signInTimeoutSecs:Long = 60 // 60 seconds
const val heartbeatInterval = 120 // 120 seconds
}
}
/**
* jt808网络会话管理
* @author male
* */
class Session : ConnectListener, SyncMessageListener<Codec<*>> {
internal class Session : ConnectListener, SyncMessageListener<Codec<*>> {
/**
* 消息内容解析器
......@@ -67,7 +94,7 @@ class Session : ConnectListener, SyncMessageListener<Codec<*>> {
* 设备状态
* */
private val deviceStatus by lazy {
AtomicReference(DeviceStatus.UnAuthenticate)
AtomicReference(DeviceStatus.UnAuthenticated)
}
/**
......@@ -81,45 +108,58 @@ class Session : ConnectListener, SyncMessageListener<Codec<*>> {
mutableListOf<ByteBuf>()
}
private val listeners by lazy {
mutableSetOf<SessionListener>()
}
private val attributeMap by lazy {
DefaultAttributeMap()
}
private val sheduler by lazy {
private val scheduler by lazy {
Executors.newScheduledThreadPool(1,NamedThreadFactory("Heartbeat"))
}
private var scheduleFutureTask: ScheduledFuture<*>?=null
private val mOperateLock by lazy {
ReentrantLock()
}
private val mOperateCondition by lazy {
mOperateLock.newCondition()
}
override fun onDataReceived(buf: ByteBuf) {
parseExecutor.execute { mParser.parse(buf) }
}
override fun onConnect() {
if (deviceStatus.get() == DeviceStatus.UnAuthenticate) {
if (deviceStatus.get() == DeviceStatus.UnAuthenticated) {
tcpManager.disconnect()
return
}
tryLogin(::doDeviceSignUp)
}
/**
* 设备尝试登录
* */
private fun tryLogin(block:()->Boolean){
var retryTimes = 0
var success = false
val isRegistered= Settings.deviceId == DeviceUtils.getUniqueDeviceId() && Settings.token.isNotEmpty() && Settings.isRegistered
if (isRegistered) {
do {
success = doAuthenticate()
retryTimes++
} while (retryTimes < CONNECT_MAX_RETRY && success)
if (!success) {
handleDisconnect(Error.JT808EncodeError)
}
} else {
do {
success = sendDeviceSignUpMessage()
retryTimes++
} while (retryTimes < CONNECT_MAX_RETRY && success)
if (!success) {
handleDisconnect(Error.JT808EncodeError)
}
var success:Boolean
do {
success = block()
retryTimes++
} while (retryTimes < CONNECT_MAX_RETRY && success)
if (!success) {
handleDisconnect(Error.JT808EncodeError)
}
}
/**
* 错误处理逻辑
* */
private fun handleDisconnect(error: Error){
when(error){
Error.IOError,Error.StreamClosed,Error.Timeout,Error.NotConnected ->{
......@@ -129,11 +169,49 @@ class Session : ConnectListener, SyncMessageListener<Codec<*>> {
}
}
fun login(){
if (Settings.isRegistered) {
doAuthenticate()
} else {
sendDeviceSignUpMessage()
/**
* 设备登录
* */
fun login():SignInStatus{
mOperateLock.lock()
try {
deviceStatus.set(DeviceStatus.Registering)
tcpManager.connect(DeviceConfig.HostConfig.host,DeviceConfig.HostConfig.port,Schedule.tcpConnectTimeoutSecs)
return if (mOperateCondition.await(Schedule.signInTimeoutSecs, TimeUnit.SECONDS)) {
when (deviceStatus.get()) {
DeviceStatus.Authenticated -> {
SignInStatus.SignIn
}
else -> {
SignInStatus.SignOut
}
}
} else {
SignInStatus.SignOut
}
} catch (e:InterruptedException) {
loge("登录线程被中断,被中断线程:${Thread.currentThread().name},中断线程:${e.printStackTrace()}")
return SignInStatus.SignOut
}finally {
mOperateLock.unlock()
}
}
/**
* 添加监听器
* */
fun addSessionListener(listener:SessionListener){
synchronized(listeners){
listeners.add(listener)
}
}
/**
* 删除监听器
* */
fun removeSessionListener(listener:SessionListener){
synchronized(listeners){
listeners.remove(listener)
}
}
......@@ -151,13 +229,13 @@ class Session : ConnectListener, SyncMessageListener<Codec<*>> {
request, MessageMetadata.inbounds(), attributeMap,
UnpooledByteBufAllocator.DEFAULT::buffer, buffs::add
)
return doSendMessage("Authenticate Message")
return doSendMessage("设备鉴权")
}
/**
* 发送设备注册消息
* */
private fun sendDeviceSignUpMessage(): Boolean {
private fun doDeviceSignUp(): Boolean {
Settings.isRegistered = false
Settings.deviceId = ""
val b0100 = B0100().apply {
......@@ -175,21 +253,23 @@ class Session : ConnectListener, SyncMessageListener<Codec<*>> {
request, MessageMetadata.inbounds(), attributeMap,
UnpooledByteBufAllocator.DEFAULT::buffer, buffs::add
)
return doSendMessage("SignUp Message")
return doSendMessage("设备注册")
}
/**
* 开始心跳
* */
private fun startHeartbeat(){
scheduleFutureTask = scheduler.scheduleAtFixedRate(::sendPing,0,Schedule.tcpReadIntervalSecs,TimeUnit.SECONDS)
}
/**
* 结束心跳
* */
private fun stopHeartbeat(){
if (scheduleFutureTask?.isDone == false) {
scheduleFutureTask?.cancel(true)
}
}
/**
......@@ -203,54 +283,92 @@ class Session : ConnectListener, SyncMessageListener<Codec<*>> {
request, MessageMetadata.inbounds(), attributeMap,
UnpooledByteBufAllocator.DEFAULT::buffer, buffs::add
)
doSendMessage("HeartBeat message")
doSendMessage("发送心跳")
}
private fun doSendMessage(messageType:String) = if (buffs.isNotEmpty()) {
/**
* 发送心跳
* */
private fun doSendMessage(name:String) = if (buffs.isNotEmpty()) {
val byteArray = ByteArray(buffs[0].readableBytes())
buffs[0].readBytes(byteArray)
if (BuildConfig.DEBUG) {
LogUtils.file("sending $messageType:${HexUtil.dump(byteArray)}")
} else {
LogUtils.d("sending $messageType:${HexUtil.dump(byteArray)}")
}
logd("【$name】:${HexUtil.dump(byteArray)}")
tcpManager.send(byteArray)
buffs.removeAt(0)
synchronized(buffs){
buffs.removeAt(0)
}
true
} else {
if (BuildConfig.DEBUG) {
LogUtils.d("sending $messageType:buffs is empty,Message encode failed!")
} else {
LogUtils.file("sending $messageType:buffs is empty,Message encode failed!")
}
loge("【$name】:buffs is empty,Message encode failed!")
false
}
override fun onDisconnect(error: Error) {
TODO("Not yet implemented")
stopHeartbeat()
logd(error.reason)
}
override fun onRegistered(data: Codec<*>) {
override fun onSignUp(data: Codec<*>) {
Settings.deviceId = DeviceUtils.getUniqueDeviceId()
Settings.isRegistered = true
Settings.token = (data as B8100Codec).newInstance().token
doAuthenticate()
}
override fun onAuthenticated(data: Codec<*>) {
val codec = (data as B8001Codec).newInstance()
when(val code = codec.result){
B8001.RESULT_SUCCESSFUL->{
deviceStatus.set(DeviceStatus.Authenticated)
}
}
deviceStatus.set(DeviceStatus.Registered)
tryLogin(::doAuthenticate)
}
override fun onCommonResponse(data: Codec<*>) {
val codec = (data as B8001Codec).newInstance()
when(codec.result){
B8001.RESULT_SUCCESSFUL->{
when (codec.replyId) {
0x0100 ->{
when(codec.result){
B8001.RESULT_SUCCESSFUL->{
deviceStatus.set(DeviceStatus.Registered)
loge("设备注册成功!")
}
B8001.RESULT_FAILED->{
deviceStatus.set(DeviceStatus.Unregistered)
loge("设备注册失败!")
}
B8001.RESULT_WRONG->{
deviceStatus.set(DeviceStatus.Unregistered)
loge("设备注册消息有误!")
}
}
}
0x0102 ->{
when(codec.result){
B8001.RESULT_SUCCESSFUL->{
if (deviceStatus.get() == DeviceStatus.Authenticating) {
deviceStatus.set(DeviceStatus.Authenticated)
startHeartbeat()
loge("设备鉴权成功!")
}else{
deviceStatus.set(DeviceStatus.UnAuthenticated)
loge("设备鉴权失败!")
}
}
B8001.RESULT_FAILED->{
deviceStatus.set(DeviceStatus.UnAuthenticated)
loge("设备鉴权失败!")
}
B8001.RESULT_WRONG->{
deviceStatus.set(DeviceStatus.UnAuthenticated)
loge("设备鉴权消息有误!")
}
}
}
0x0002->{
when(codec.result){
B8001.RESULT_SUCCESSFUL->{
logd("心跳响应成功")
}
B8001.RESULT_FAILED->{
loge("心跳响应失败!")
}
B8001.RESULT_WRONG->{
loge("心跳消息有误!")
}
}
}
}
}
......
......@@ -9,13 +9,7 @@ interface SyncMessageListener<T> {
* 设备注册
* @param data
* */
fun onRegistered(data:T)
/**
* 设备鉴权
* @param data
* */
fun onAuthenticated(data:T)
fun onSignUp(data:T)
/**
* 平台通用应答
......
......@@ -2,6 +2,7 @@ package com.waytous.anticollision.tcp
import com.blankj.utilcode.util.LogUtils
import com.waytous.anticollision.BuildConfig
import com.waytous.anticollision.utils.logd
import io.github.toggery.jt808.codec.*
import io.netty.buffer.ByteBuf
import io.netty.util.DefaultAttributeMap
......@@ -21,9 +22,10 @@ class SyncParser(private val syncMessageListener: SyncMessageListener<Codec<*>>)
fun parse(buf: ByteBuf){
try {
val message: Message<Codec<*>> = Message.decode(buf, MessageMetadata.outbounds(), DefaultAttributeMap())
logd("【解析】:$message")
when(val codec:Codec<*> = message.body){
is B8001Codec -> syncMessageListener.onCommonResponse(codec)
is B8100Codec -> syncMessageListener.onRegistered(codec)
is B8100Codec -> syncMessageListener.onSignUp(codec)
}
LogUtils.d(message.toString())
} catch (e: Exception) {
......
package com.waytous.anticollision.tcp
/**
* tcp通信错误定义
* */
enum class Error(val reason:String = "success") {
NOError,
IOError("io error"),
StreamClosed("stream closed"),
ConnectionRefused("connection refused"),
Timeout("time out"),
NotConnected("not connected"),
UserDisconnected("user disconnected"),
InvalidParam("invalid param"),
ServerUnknownError("server unknown error"),
JT808EncodeError("jt808 codec encode error"),
JT808DecodeError("jt808 codec decode error")
}
......@@ -35,7 +35,7 @@ const val READ_SIZE = 1024
* 链接状态
* @author male
* */
enum class ConnectStatus {
internal enum class State {
/**
* 链接断开
* */
......@@ -112,7 +112,7 @@ internal class TcpManager(
* @param error
* */
fun disconnect(error: Error = Error.UserDisconnected) {
if (connection.connectStatus.get() == ConnectStatus.Disconnected) {
if (connection.connectStatus.get() == State.Disconnected) {
return
}
receiveExecutor.execute {
......@@ -127,7 +127,7 @@ internal class TcpManager(
* @param data
* */
fun send(data: ByteArray) {
if (connection.connectStatus.get() == ConnectStatus.Connected) {
if (connection.connectStatus.get() == State.Connected) {
sendExecutors.execute { connection.send(data) }
}
}
......@@ -160,8 +160,6 @@ internal class TcpManager(
* */
class Connection {
private val TAG = "Connection"
/**
* 接收线程锁
* */
......@@ -176,7 +174,12 @@ internal class TcpManager(
ReentrantLock()
}
private lateinit var socket: Socket
private val socket by lazy {
Socket().apply {
keepAlive = true
receiveBufferSize = BUFFER_SIZE
}
}
/**
* 数据接收流
......@@ -192,7 +195,7 @@ internal class TcpManager(
* 链接状态
* */
val connectStatus by lazy {
AtomicReference(ConnectStatus.Disconnected)
AtomicReference(State.Disconnected)
}
/**
......@@ -213,36 +216,32 @@ internal class TcpManager(
receiveLock.lock()
sendLock.lock()
var errorCode: Error = Error.NOError
if (socket != null && socket.isConnected && connectStatus.get() > ConnectStatus.Disconnected) {
if (socket.isConnected && connectStatus.get() > State.Disconnected) {
errorCode = Error.NOError
} else if (socket != null) {
} else {
closeSocket()
}
connectStatus.set(ConnectStatus.Connecting)
socket = Socket().apply {
soTimeout = receiveInterval
keepAlive = true
receiveBufferSize = BUFFER_SIZE
}
connectStatus.set(State.Connecting)
socket.soTimeout = receiveInterval
socket.connect(
InetSocketAddress(host, port),
if (timeout > 0) timeout else DEFAULT_TIMEOUT
)
inputStream = socket.getInputStream()
outputStream = socket.getOutputStream()
connectStatus.set(ConnectStatus.Connected)
connectStatus.set(State.Connected)
errorCode
} catch (e: IOException) {
connectStatus.set(ConnectStatus.Disconnected)
connectStatus.set(State.Disconnected)
Error.IOError
} catch (e: SocketTimeoutException) {
connectStatus.set(ConnectStatus.Disconnected)
connectStatus.set(State.Disconnected)
Error.Timeout
} catch (e: IllegalArgumentException) {
connectStatus.set(ConnectStatus.Disconnected)
connectStatus.set(State.Disconnected)
Error.InvalidParam
} catch (e: Exception) {
connectStatus.set(ConnectStatus.Disconnected)
connectStatus.set(State.Disconnected)
Error.ServerUnknownError
} finally {
sendLock.unlock()
......@@ -289,7 +288,7 @@ internal class TcpManager(
fun receive(): ReceivedResult {
try {
receiveLock.lock()
if (connectStatus.get() != ConnectStatus.Connected) return ReceivedResult(
if (connectStatus.get() != State.Connected) return ReceivedResult(
Error.NotConnected,
0
)
......@@ -299,22 +298,25 @@ internal class TcpManager(
ReceivedResult(Error.NOError, len)
} else {
if (len == 0) {
connectStatus.set(ConnectStatus.Disconnected)
connectStatus.set(State.Disconnected)
ReceivedResult(Error.StreamClosed, len)
} else {
cleanup()
connectStatus.set(ConnectStatus.Disconnected)
connectStatus.set(State.Disconnected)
ReceivedResult(Error.IOError, len)
}
}
} catch (e: IOException) {
connectStatus.set(ConnectStatus.Disconnected)
connectStatus.set(State.Disconnected)
return ReceivedResult(Error.IOError, -1)
} catch (e: SocketTimeoutException) {
connectStatus.set(State.Disconnected)
return ReceivedResult(Error.Timeout, 0)
} catch (e: IllegalArgumentException) {
connectStatus.set(ConnectStatus.Disconnected)
connectStatus.set(State.Disconnected)
return ReceivedResult(Error.InvalidParam, 0)
} catch (e: Exception) {
connectStatus.set(ConnectStatus.Disconnected)
connectStatus.set(State.Disconnected)
return ReceivedResult(Error.ServerUnknownError, 0)
} finally {
receiveLock.unlock()
......@@ -333,7 +335,7 @@ internal class TcpManager(
receiveLock.lock()
sendLock.lock()
closeSocket()
connectStatus.set(ConnectStatus.Disconnected)
connectStatus.set(State.Disconnected)
} catch (e: Exception) {
if (BuildConfig.DEBUG) {
LogUtils.e("cleanup error:${e.message}")
......
package com.waytous.anticollision.utils
import com.blankj.utilcode.util.LogUtils
import com.waytous.anticollision.BuildConfig
/**
* 连接状态
* */
enum class ConnectStatus {
Disconnected, Connected
}
/**
* 登录状态
* */
enum class SignInStatus {
SignOut, SignIn
}
fun logd(message:String){
if (BuildConfig.DEBUG) {
if (BuildConfig.DEBUG) {
LogUtils.d(message)
} else {
LogUtils.file(message)
}
}
}
fun loge(message:String){
if (BuildConfig.DEBUG) {
if (BuildConfig.DEBUG) {
LogUtils.e(message)
} else {
LogUtils.file(message)
}
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment