Working on un-threading the packetsocket.

This commit is contained in:
Kiri 2024-07-14 16:23:39 -07:00
parent 53e4be0439
commit 08b8cdcf14
3 changed files with 62 additions and 20 deletions

View File

@ -2,12 +2,26 @@ extends Node
func _ready(): func _ready():
var pw : KiriPythonWrapperInstance = KiriPythonWrapperInstance.new( #var pw : KiriPythonWrapperInstance = KiriPythonWrapperInstance.new(
"/storage/git2/GodotJSONRPCTest/addons/KiriPythonRPCWrapper/KiriPythonRPCWrapper/test_module/__init__.py") #"/storage/git2/GodotJSONRPCTest/addons/KiriPythonRPCWrapper/KiriPythonRPCWrapper/test_module/__init__.py")
#
#pw.setup_python()
#pw.start_process()
#
#var ret = pw.call_rpc_sync("func_to_call", ["test string whatever blah"])
#print(ret)
pw.setup_python() print("Calling the thing")
pw.start_process() butt("nkc sckccsd scaskcmasklcms")
print("Done calling the thing:" )
asdfblah.emit()
var ret = pw.call_rpc_sync("func_to_call", ["test string whatever blah"])
print(ret)
signal asdfblah
func butt(asdf):
print("Butt1: ", asdf)
await asdfblah
print("Butt2: ", asdf)

View File

@ -14,7 +14,8 @@ var _state : KiriSocketState = KiriSocketState.DISCONNECTED
var _outgoing_packet_queue : Array = [] var _outgoing_packet_queue : Array = []
var _state_lock : Mutex = Mutex.new() var _state_lock : Mutex = Mutex.new()
var _worker_thread : Thread = null var _worker_thread : bool = false
signal _worker_thread_should_continue
var _new_connections_to_server : Array = [] var _new_connections_to_server : Array = []
var _error_string : String = "" var _error_string : String = ""
@ -75,13 +76,22 @@ func send_packet(packet_bytes : PackedByteArray):
_outgoing_packet_queue.append(packet_bytes) _outgoing_packet_queue.append(packet_bytes)
_state_lock.unlock() _state_lock.unlock()
func poll():
_worker_thread_should_continue.emit()
func get_next_packet(): func get_next_packet():
poll()
_state_lock.lock() _state_lock.lock()
var ret = _packet_buffer.get_next_packet() var ret = _packet_buffer.get_next_packet()
_state_lock.unlock() _state_lock.unlock()
return ret return ret
func get_next_server_connection(): func get_next_server_connection():
poll()
_state_lock.lock() _state_lock.lock()
var ret = null var ret = null
if len(_new_connections_to_server) > 0: if len(_new_connections_to_server) > 0:
@ -121,8 +131,10 @@ func start_server(address):
_set_state(KiriSocketState.SERVER_STARTING) _set_state(KiriSocketState.SERVER_STARTING)
assert(not _worker_thread) assert(not _worker_thread)
_worker_thread = Thread.new() _worker_thread = true
_worker_thread.start(_server_thread_func.bind(address))
# Starts coroutine.
_server_thread_func(address)
func start_client(address): func start_client(address):
@ -130,15 +142,17 @@ func start_client(address):
assert(not _worker_thread) assert(not _worker_thread)
_worker_thread = Thread.new() _worker_thread = true
_worker_thread.start(_client_thread_func.bind(address))
# Starts coroutine.
_client_thread_func(address)
func stop(): func stop():
assert(_worker_thread) assert(_worker_thread)
_should_quit = true _should_quit = true
_worker_thread.wait_to_finish() while _worker_thread:
_worker_thread = null _worker_thread_should_continue.emit()
_should_quit = false _should_quit = false
func is_running(): func is_running():
@ -188,10 +202,12 @@ func _client_thread_func(address):
_set_state(KiriSocketState.CONNECTED) _set_state(KiriSocketState.CONNECTED)
while not _should_quit: while not _should_quit:
await _worker_thread_should_continue
var err = _normal_communication_loop_iteration(sock, address) var err = _normal_communication_loop_iteration(sock, address)
if err != OK: if err != OK:
break break
OS.delay_usec(1)
# We are now disconnected. # We are now disconnected.
_set_state(KiriSocketState.DISCONNECTED) _set_state(KiriSocketState.DISCONNECTED)
@ -200,6 +216,8 @@ func _client_thread_func(address):
else: else:
_set_state(KiriSocketState.ERROR, "Connection failed") _set_state(KiriSocketState.ERROR, "Connection failed")
_worker_thread = false
func _set_state(state : KiriSocketState, error_string=null): func _set_state(state : KiriSocketState, error_string=null):
_state_lock.lock() _state_lock.lock()
_state = state _state = state
@ -216,12 +234,14 @@ func _set_state(state : KiriSocketState, error_string=null):
func _server_to_client_thread_func(connection : StreamPeerTCP, address): func _server_to_client_thread_func(connection : StreamPeerTCP, address):
_set_state(KiriSocketState.CONNECTED) _set_state(KiriSocketState.CONNECTED)
while not _should_quit: while not _should_quit:
await _worker_thread_should_continue
var err = _normal_communication_loop_iteration(connection, address) var err = _normal_communication_loop_iteration(connection, address)
if err != OK: if err != OK:
break break
OS.delay_usec(1)
# FIXME: Missing some error handling here due to exception differences # FIXME: Missing some error handling here due to exception differences
# between Python and GDScript. # between Python and GDScript.
@ -231,6 +251,8 @@ func _server_to_client_thread_func(connection : StreamPeerTCP, address):
if get_state() == KiriSocketState.CONNECTED: if get_state() == KiriSocketState.CONNECTED:
_set_state(KiriSocketState.DISCONNECTED) _set_state(KiriSocketState.DISCONNECTED)
_worker_thread = false
func _server_thread_func(address): func _server_thread_func(address):
while not _should_quit: while not _should_quit:
@ -250,7 +272,9 @@ func _server_thread_func(address):
_set_state(KiriSocketState.SERVER_LISTENING) _set_state(KiriSocketState.SERVER_LISTENING)
while not _should_quit: while not _should_quit:
await _worker_thread_should_continue
if sock.is_connection_available(): if sock.is_connection_available():
var connection : StreamPeerTCP = sock.take_connection() var connection : StreamPeerTCP = sock.take_connection()
var new_client : KiriPacketSocket = KiriPacketSocket.new() var new_client : KiriPacketSocket = KiriPacketSocket.new()
@ -259,12 +283,15 @@ func _server_thread_func(address):
_new_connections_to_server.append(new_client) _new_connections_to_server.append(new_client)
_state_lock.unlock() _state_lock.unlock()
OS.delay_usec(1) _worker_thread = false
func _start_client_connection_from_server(connection : StreamPeerTCP, address): func _start_client_connection_from_server(connection : StreamPeerTCP, address):
assert(not _worker_thread) assert(not _worker_thread)
_worker_thread = Thread.new() _worker_thread = true
_worker_thread.start(_server_to_client_thread_func.bind(connection, address))
# Coroutine call.
_server_to_client_thread_func(connection, address)

View File

@ -11,3 +11,4 @@ def func_to_call(asdf):
def other_func_to_call(): def other_func_to_call():
print("jksdmckjsdncjksncs") print("jksdmckjsdncjksncs")