More stream work.

This commit is contained in:
Kiri 2024-07-14 18:40:49 -07:00
parent 08b8cdcf14
commit f0cb6f7c1f
9 changed files with 135 additions and 73 deletions

View File

@ -43,7 +43,7 @@ func _process(_delta):
wrapper_instance.poll() wrapper_instance.poll()
#print(wrapper_instance.get_status()) #print(wrapper_instance.get_status())
wrapper_instance.call_rpc_async("func_to_call", [12345], func(stuff): wrapper_instance.call_rpc_callback("func_to_call", [12345], func(stuff):
print(stuff.response) print(stuff.response)
) )

View File

View File

@ -0,0 +1,13 @@
#!/usr/bin/python3
import time
def some_function_to_call():
print("butts")
a = 5
while a > 0:
a -= 1
print(a)
time.sleep(1.0)

View File

@ -1,23 +1,44 @@
extends Node extends Node
func _ready(): var pw : KiriPythonWrapperInstance
#var pw : KiriPythonWrapperInstance = KiriPythonWrapperInstance.new(
func _ready():
#pw = KiriPythonWrapperInstance.new(
#"/storage/git2/GodotJSONRPCTest/addons/KiriPythonRPCWrapper/KiriPythonRPCWrapper/test_module/__init__.py") #"/storage/git2/GodotJSONRPCTest/addons/KiriPythonRPCWrapper/KiriPythonRPCWrapper/test_module/__init__.py")
#
#pw.setup_python() #pw.setup_python()
#pw.start_process() #pw.start_process()
#
#var ret = pw.call_rpc_sync("func_to_call", ["test string whatever blah"]) #var ret = pw.call_rpc_sync("func_to_call", ["test string whatever blah"])
#print(ret) #print(ret)
#print("Calling the thing")
#butt("nkc sckccsd scaskcmasklcms")
#print("Done calling the thing:" )
#asdfblah.emit()
#pw.stop_process()
#
pw = KiriPythonWrapperInstance.new(
"res://SomePythonThingy/test_a_thing.py")
pw.setup_python()
pw.start_process()
print("Call thingy...")
var ret = await pw.call_rpc_async("some_function_to_call", [])
print("Done thingy...")
print(ret)
print("Calling the thing") func _process(delta):
butt("nkc sckccsd scaskcmasklcms") #if pw.get_status() == \
print("Done calling the thing:" ) #KiriPythonWrapperInstance.KiriPythonWrapperStatus.STATUS_RUNNING:
asdfblah.emit()
#print("poll")
pw.poll()
#if pw.get_status() == \
#KiriPythonWrapperInstance.KiriPythonWrapperStatus.STATUS_STOPPED:
#print("IT STOPPED")
signal asdfblah signal asdfblah
func butt(asdf): func butt(asdf):
@ -25,3 +46,4 @@ func butt(asdf):
await asdfblah await asdfblah
print("Butt2: ", asdf) print("Butt2: ", asdf)

View File

@ -166,9 +166,9 @@ func _get_cache_path_relative():
# Get the full cache path, as understood by the OS. # Get the full cache path, as understood by the OS.
# Example return value: # Example return value:
# "/home/kiri/.local/share/godot/app_userdata/GodotJSONRPCTest/_python_dist/20240415/3.12.3" # "/home/kiri/.local/share/godot/app_userdata/GodotJSONRPCTest/_python_dist/20240415/3.12.3/packaged_scripts"
func _get_cache_path_system() -> String: func _get_script_cache_path_system() -> String:
return OS.get_user_data_dir().path_join(_get_cache_path_relative()) return OS.get_user_data_dir().path_join(_get_cache_path_relative()).path_join("packaged_scripts")
# Get the full cache path, as understood by Godot. # Get the full cache path, as understood by Godot.
# Example return value: # Example return value:

View File

@ -13,7 +13,6 @@ var _packet_buffer : KiriPacketBuffer = KiriPacketBuffer.new()
var _state : KiriSocketState = KiriSocketState.DISCONNECTED var _state : KiriSocketState = KiriSocketState.DISCONNECTED
var _outgoing_packet_queue : Array = [] var _outgoing_packet_queue : Array = []
var _state_lock : Mutex = Mutex.new()
var _worker_thread : bool = false var _worker_thread : bool = false
signal _worker_thread_should_continue signal _worker_thread_should_continue
@ -66,47 +65,30 @@ enum KiriSocketState {
ERROR = 5 ERROR = 5
} }
func _notification(what):
if what == NOTIFICATION_PREDELETE:
assert(not _worker_thread)
func send_packet(packet_bytes : PackedByteArray): func send_packet(packet_bytes : PackedByteArray):
assert(packet_bytes) assert(packet_bytes)
_state_lock.lock()
_outgoing_packet_queue.append(packet_bytes) _outgoing_packet_queue.append(packet_bytes)
_state_lock.unlock()
func poll(): func poll():
_worker_thread_should_continue.emit() _worker_thread_should_continue.emit()
func get_next_packet(): func get_next_packet():
poll() poll()
_state_lock.lock()
var ret = _packet_buffer.get_next_packet() var ret = _packet_buffer.get_next_packet()
_state_lock.unlock()
return ret return ret
func get_next_server_connection(): func get_next_server_connection():
poll() poll()
_state_lock.lock()
var ret = null var ret = null
if len(_new_connections_to_server) > 0: if len(_new_connections_to_server) > 0:
ret = _new_connections_to_server.pop_front() ret = _new_connections_to_server.pop_front()
_state_lock.unlock()
return ret return ret
func get_last_error(): func get_last_error():
_state_lock.lock()
var ret = _error_string var ret = _error_string
_state_lock.unlock()
return ret return ret
func is_disconnected_or_error(): func is_disconnected_or_error():
_state_lock.lock()
var bad_states = [ var bad_states = [
KiriSocketState.DISCONNECTED, KiriSocketState.DISCONNECTED,
KiriSocketState.ERROR KiriSocketState.ERROR
@ -115,15 +97,11 @@ func is_disconnected_or_error():
var ret : bool = false var ret : bool = false
if _state in bad_states: if _state in bad_states:
ret = true ret = true
_state_lock.unlock()
return ret return ret
func get_state(): func get_state():
_state_lock.lock()
var ret = _state var ret = _state
_state_lock.unlock()
return ret return ret
func start_server(address): func start_server(address):
@ -149,7 +127,9 @@ func start_client(address):
func stop(): func stop():
assert(_worker_thread) if not _worker_thread:
return
_should_quit = true _should_quit = true
while _worker_thread: while _worker_thread:
_worker_thread_should_continue.emit() _worker_thread_should_continue.emit()
@ -167,17 +147,14 @@ func _normal_communication_loop_iteration(sock : StreamPeer, address):
return FAILED return FAILED
# Get new data. # Get new data.
_state_lock.lock()
var available_bytes = sock.get_available_bytes() var available_bytes = sock.get_available_bytes()
if available_bytes > 0: if available_bytes > 0:
var incoming_bytes = sock.get_data(available_bytes) var incoming_bytes = sock.get_data(available_bytes)
_packet_buffer.add_bytes(PackedByteArray(incoming_bytes[1])) _packet_buffer.add_bytes(PackedByteArray(incoming_bytes[1]))
if incoming_bytes[0] != OK: if incoming_bytes[0] != OK:
return FAILED return FAILED
_state_lock.unlock()
# Send all packets from queue. # Send all packets from queue.
_state_lock.lock()
while len(self._outgoing_packet_queue): while len(self._outgoing_packet_queue):
var next_outgoing_packet = _outgoing_packet_queue.pop_front() var next_outgoing_packet = _outgoing_packet_queue.pop_front()
var len_to_send = len(next_outgoing_packet) var len_to_send = len(next_outgoing_packet)
@ -186,7 +163,6 @@ func _normal_communication_loop_iteration(sock : StreamPeer, address):
sock.put_u8((len_to_send & 0x00ff0000) >> 16) sock.put_u8((len_to_send & 0x00ff0000) >> 16)
sock.put_u8((len_to_send & 0xff000000) >> 24) sock.put_u8((len_to_send & 0xff000000) >> 24)
sock.put_data(next_outgoing_packet) sock.put_data(next_outgoing_packet)
_state_lock.unlock()
return OK return OK
@ -216,23 +192,22 @@ func _client_thread_func(address):
else: else:
_set_state(KiriSocketState.ERROR, "Connection failed") _set_state(KiriSocketState.ERROR, "Connection failed")
sock.close()
_worker_thread = false _worker_thread = false
func _set_state(state : KiriSocketState, error_string=null): func _set_state(state : KiriSocketState, error_string=null):
_state_lock.lock()
_state = state _state = state
if _state == KiriSocketState.ERROR: if _state == KiriSocketState.ERROR:
assert(error_string) assert(error_string)
_error_string = error_string _error_string = error_string
else: else:
assert(not error_string) assert(not error_string)
_error_string = "" _error_string = ""
_state_lock.unlock()
func _server_to_client_thread_func(connection : StreamPeerTCP, address): func _server_to_client_thread_func(connection : StreamPeerTCP, address):
print("_server_to_client_thread_func start")
_set_state(KiriSocketState.CONNECTED) _set_state(KiriSocketState.CONNECTED)
while not _should_quit: while not _should_quit:
@ -251,7 +226,10 @@ func _server_to_client_thread_func(connection : StreamPeerTCP, address):
if get_state() == KiriSocketState.CONNECTED: if get_state() == KiriSocketState.CONNECTED:
_set_state(KiriSocketState.DISCONNECTED) _set_state(KiriSocketState.DISCONNECTED)
connection.disconnect_from_host()
_worker_thread = false _worker_thread = false
print("_server_to_client_thread_func stop")
func _server_thread_func(address): func _server_thread_func(address):
@ -279,9 +257,15 @@ func _server_thread_func(address):
var connection : StreamPeerTCP = sock.take_connection() var connection : StreamPeerTCP = sock.take_connection()
var new_client : KiriPacketSocket = KiriPacketSocket.new() var new_client : KiriPacketSocket = KiriPacketSocket.new()
new_client._start_client_connection_from_server(connection, address) new_client._start_client_connection_from_server(connection, address)
_state_lock.lock()
_new_connections_to_server.append(new_client) _new_connections_to_server.append(new_client)
_state_lock.unlock()
sock.stop()
sock = null
# Close all connections that were waiting to be accepted.
for c in _new_connections_to_server:
c.stop()
_new_connections_to_server = []
_worker_thread = false _worker_thread = false
@ -293,5 +277,11 @@ func _start_client_connection_from_server(connection : StreamPeerTCP, address):
# Coroutine call. # Coroutine call.
_server_to_client_thread_func(connection, address) _server_to_client_thread_func(connection, address)
func _notification(what):
if what == NOTIFICATION_PREDELETE:
# Well, this is horrible.
if self:
if is_running():
stop()

View File

@ -34,6 +34,8 @@ var _build_wrangler : KiriPythonBuildWrangler = null
var _external_process_pid = -1 var _external_process_pid = -1
signal _rpc_async_response_received
func _init(python_file_path : String): func _init(python_file_path : String):
_build_wrangler = KiriPythonBuildWrangler.new() _build_wrangler = KiriPythonBuildWrangler.new()
@ -50,7 +52,7 @@ func _get_wrapper_script():
script_dirname + "/KiriPythonRPCWrapper_start.py") script_dirname + "/KiriPythonRPCWrapper_start.py")
func _get_wrapper_cache_path() -> String: func _get_wrapper_cache_path() -> String:
return _build_wrangler._get_cache_path_godot().path_join("KiriPythonRPCWrapper") return _build_wrangler._get_cache_path_godot().path_join("packaged_scripts")
func _get_wrapper_script_cache_path() -> String: func _get_wrapper_script_cache_path() -> String:
return _get_wrapper_cache_path().path_join("addons/KiriPythonRPCWrapper/KiriPythonRPCWrapper/__init__.py") return _get_wrapper_cache_path().path_join("addons/KiriPythonRPCWrapper/KiriPythonRPCWrapper/__init__.py")
@ -90,12 +92,22 @@ func get_status():
return KiriPythonWrapperStatus.STATUS_RUNNING return KiriPythonWrapperStatus.STATUS_RUNNING
func start_process(): func start_process(open_terminal : bool = false):
# FIXME: Make sure we don't have one running. # FIXME: Make sure we don't have one running.
var open_port = 9500 var open_port = 9500
# Convert Python script path into a real path on the system.
var real_python_script_path = python_script_path
if real_python_script_path.begins_with("res://"):
real_python_script_path = _build_wrangler._get_script_cache_path_system().path_join(
real_python_script_path.substr(len("res://")))
else:
real_python_script_path = ProjectSettings.globalize_path(
real_python_script_path)
print("REAL PATH: ", real_python_script_path)
assert(not _server_packet_socket) assert(not _server_packet_socket)
_server_packet_socket = KiriPacketSocket.new() _server_packet_socket = KiriPacketSocket.new()
while true: while true:
@ -114,25 +126,27 @@ func start_process():
_server_packet_socket.stop() _server_packet_socket.stop()
open_port += 1 open_port += 1
print("Port: ", open_port)
var python_exe_path : String = _get_python_executable() var python_exe_path : String = _get_python_executable()
var wrapper_script_path : String = \ var wrapper_script_path : String = \
ProjectSettings.globalize_path(_get_wrapper_script_cache_path()) ProjectSettings.globalize_path(_get_wrapper_script_cache_path())
var startup_command : Array = [ var startup_command : Array = [
"xterm", "-e",
python_exe_path, python_exe_path,
wrapper_script_path, wrapper_script_path,
"--script", python_script_path, "--script", real_python_script_path,
"--port", open_port] "--port", open_port]
print("startup command: ", startup_command) if open_terminal:
if OS.get_name() == "Linux":
startup_command = ["xterm", "-e"] + startup_command
#print("startup command: ", startup_command)
_external_process_pid = OS.create_process( _external_process_pid = OS.create_process(
startup_command[0], startup_command.slice(1), true) startup_command[0], startup_command.slice(1),
open_terminal)
print("external process: ", _external_process_pid) #print("external process: ", _external_process_pid)
func stop_process(): func stop_process():
@ -149,7 +163,7 @@ func stop_process():
communication_packet_socket.stop() communication_packet_socket.stop()
communication_packet_socket = null communication_packet_socket = null
func call_rpc_async(method : String, args : Variant, callback = null) -> int: func call_rpc_callback(method : String, args : Variant, callback = null) -> int:
assert((args is Dictionary) or (args is Array)) assert((args is Dictionary) or (args is Array))
assert((callback == null) or (callback is Callable)) assert((callback == null) or (callback is Callable))
@ -165,6 +179,24 @@ func call_rpc_async(method : String, args : Variant, callback = null) -> int:
return new_request.id return new_request.id
func call_rpc_async(method : String, args : Variant):
var request_id = call_rpc_callback(method, args, func(request_ob):
_rpc_async_response_received.emit(request_ob)
)
# Wait (block) until we get a response.
while true:
var rpc_response = await _rpc_async_response_received
if not rpc_response:
push_error("Error happened while waiting for RPC response in async call.")
break
if rpc_response.id == request_id:
return rpc_response.response
return null
func call_rpc_sync(method : String, args : Variant): func call_rpc_sync(method : String, args : Variant):
# Kinda hacky. We're using arrays because we can change the contents. # Kinda hacky. We're using arrays because we can change the contents.
@ -173,7 +205,7 @@ func call_rpc_sync(method : String, args : Variant):
# ones. # ones.
var done_array = [false] var done_array = [false]
var response_list = [] var response_list = []
var request_id = call_rpc_async(method, args, func(request_ob): var request_id = call_rpc_callback(method, args, func(request_ob):
done_array[0] = true done_array[0] = true
response_list.append(request_ob.response) response_list.append(request_ob.response)
) )
@ -184,6 +216,7 @@ func call_rpc_sync(method : String, args : Variant):
# Bail out if something happened to our instance or connection to it. # Bail out if something happened to our instance or connection to it.
if communication_packet_socket: if communication_packet_socket:
if communication_packet_socket.is_disconnected_or_error(): if communication_packet_socket.is_disconnected_or_error():
push_error("Disconnected from RPC client while waiting for response.")
break break
poll() poll()
@ -205,6 +238,14 @@ func poll():
if communication_packet_socket: if communication_packet_socket:
if communication_packet_socket.is_disconnected_or_error():
# Tell any awaiting async calls that they're never getting an
# answer. So sad.
_rpc_async_response_received.emit(null)
stop_process()
push_error("poll(): Disconnected from RPC client.")
return
# Send all waiting requests # Send all waiting requests
for request_id in _active_request_queue: for request_id in _active_request_queue:
var request : KiriPythonWrapperActiveRequest = _active_request_queue[request_id] var request : KiriPythonWrapperActiveRequest = _active_request_queue[request_id]
@ -237,6 +278,8 @@ func poll():
_active_request_queue[request_id] _active_request_queue[request_id]
if "result" in packet_dict: if "result" in packet_dict:
request.response = packet_dict["result"] request.response = packet_dict["result"]
elif "error" in packet_dict:
push_error(packet_dict["error"])
else: else:
request.error_response = "Couldn't find result on packet." request.error_response = "Couldn't find result on packet."
if request.callback: if request.callback:

View File

@ -6,7 +6,7 @@
# Godot supports loading files with GZip and Zstandard compression, but only # Godot supports loading files with GZip and Zstandard compression, but only
# files that it's saved (with a header/footer), so it can't load normal .tar.gz # files that it's saved (with a header/footer), so it can't load normal .tar.gz
# or .tar.zst files. It can load zips, though. # or .tar.zst files. It can load zips, though.
# #tar
# DO NOT USE THIS ON UNTRUSTED DATA. # DO NOT USE THIS ON UNTRUSTED DATA.
extends RefCounted extends RefCounted
@ -237,8 +237,10 @@ func open(path: String) -> Error:
if merged_paxheader.has("path"): if merged_paxheader.has("path"):
tar_record.filename = merged_paxheader["path"] tar_record.filename = merged_paxheader["path"]
print("fixing path for paxheader: ", tar_record.filename)
if merged_paxheader.has("linkpath"):
tar_record.link_destination = merged_paxheader["linkpath"]
# Add it to our record list. # Add it to our record list.
_internal_file_list.append(tar_record) _internal_file_list.append(tar_record)

View File

@ -5,24 +5,16 @@ Done:
x remove parent_pid from wrapper script x remove parent_pid from wrapper script
x remove KiriPythonRPCWrapper_start.py x remove KiriPythonRPCWrapper_start.py
x remove test_rpc.py x remove test_rpc.py
x Un-thread the GDScript side of PacketSocket.
x Fix whatever this is: <stuff was here>
x example Python module from OUTSIDE the addon
x Remove xterm dependency, or make it like a debug-only thing.
The big ones: The big ones:
- Un-thread the GDScript side of PacketSocket.
- Fix whatever this is:
SCRIPT ERROR: Assertion failed.
at: KiriPacketSocket._notification (res://addons/KiriPythonRPCWrapper/KiriPacketSocket/KiriPacketSocket.gd:70)
WARNING: A Thread object is being destroyed without its completion having been realized.
Please call wait_to_finish() on it to ensure correct cleanup.
at: ~Thread (core/os/thread.cpp:102)
- First-time setup of requirements (pip, etc). - First-time setup of requirements (pip, etc).
- Remove xterm dependency, or make it like a debug-only thing.
- Test on WINE/Windows. - Test on WINE/Windows.
- Documentation. - Documentation.
- how to use .kiri_export_python - how to use .kiri_export_python
- example Python module from OUTSIDE the addon