From f0cb6f7c1f0b7ed2148d17523e1d6761cbaaed6b Mon Sep 17 00:00:00 2001 From: Kiri Date: Sun, 14 Jul 2024 18:40:49 -0700 Subject: [PATCH] More stream work. --- Control.gd | 2 +- SomePythonThingy/.kiri_export_python | 0 SomePythonThingy/test_a_thing.py | 13 ++++ TestPythonInExport.gd | 44 +++++++++---- .../KiriPythonBuildWrangler.gd | 6 +- .../KiriPacketSocket/KiriPacketSocket.gd | 56 +++++++--------- .../KiriPythonWrapperInstance.gd | 65 +++++++++++++++---- addons/KiriPythonRPCWrapper/KiriTARReader.gd | 6 +- addons/KiriPythonRPCWrapper/TODO.md | 16 ++--- 9 files changed, 135 insertions(+), 73 deletions(-) create mode 100644 SomePythonThingy/.kiri_export_python create mode 100644 SomePythonThingy/test_a_thing.py diff --git a/Control.gd b/Control.gd index 37d1cb0..50c2567 100644 --- a/Control.gd +++ b/Control.gd @@ -43,7 +43,7 @@ func _process(_delta): wrapper_instance.poll() #print(wrapper_instance.get_status()) - wrapper_instance.call_rpc_async("func_to_call", [12345], func(stuff): + wrapper_instance.call_rpc_callback("func_to_call", [12345], func(stuff): print(stuff.response) ) diff --git a/SomePythonThingy/.kiri_export_python b/SomePythonThingy/.kiri_export_python new file mode 100644 index 0000000..e69de29 diff --git a/SomePythonThingy/test_a_thing.py b/SomePythonThingy/test_a_thing.py new file mode 100644 index 0000000..8c3c96e --- /dev/null +++ b/SomePythonThingy/test_a_thing.py @@ -0,0 +1,13 @@ +#!/usr/bin/python3 + +import time + +def some_function_to_call(): + print("butts") + + a = 5 + while a > 0: + a -= 1 + print(a) + time.sleep(1.0) + diff --git a/TestPythonInExport.gd b/TestPythonInExport.gd index 6a2bd26..3383fc5 100644 --- a/TestPythonInExport.gd +++ b/TestPythonInExport.gd @@ -1,23 +1,44 @@ extends Node -func _ready(): +var pw : KiriPythonWrapperInstance - #var pw : KiriPythonWrapperInstance = KiriPythonWrapperInstance.new( + +func _ready(): + #pw = KiriPythonWrapperInstance.new( #"/storage/git2/GodotJSONRPCTest/addons/KiriPythonRPCWrapper/KiriPythonRPCWrapper/test_module/__init__.py") -# #pw.setup_python() #pw.start_process() -# #var ret = pw.call_rpc_sync("func_to_call", ["test string whatever blah"]) #print(ret) + #print("Calling the thing") + #butt("nkc sckccsd scaskcmasklcms") + #print("Done calling the thing:" ) + #asdfblah.emit() + #pw.stop_process() + # + + + + + pw = KiriPythonWrapperInstance.new( + "res://SomePythonThingy/test_a_thing.py") + pw.setup_python() + pw.start_process() + print("Call thingy...") + var ret = await pw.call_rpc_async("some_function_to_call", []) + print("Done thingy...") + print(ret) - print("Calling the thing") - butt("nkc sckccsd scaskcmasklcms") - print("Done calling the thing:" ) - asdfblah.emit() - - - +func _process(delta): + #if pw.get_status() == \ + #KiriPythonWrapperInstance.KiriPythonWrapperStatus.STATUS_RUNNING: + + #print("poll") + pw.poll() + + #if pw.get_status() == \ + #KiriPythonWrapperInstance.KiriPythonWrapperStatus.STATUS_STOPPED: + #print("IT STOPPED") signal asdfblah func butt(asdf): @@ -25,3 +46,4 @@ func butt(asdf): await asdfblah print("Butt2: ", asdf) + diff --git a/addons/KiriPythonRPCWrapper/KiriPythonBuildWrangler.gd b/addons/KiriPythonRPCWrapper/KiriPythonBuildWrangler.gd index b1fdeeb..d43b44c 100644 --- a/addons/KiriPythonRPCWrapper/KiriPythonBuildWrangler.gd +++ b/addons/KiriPythonRPCWrapper/KiriPythonBuildWrangler.gd @@ -166,9 +166,9 @@ func _get_cache_path_relative(): # Get the full cache path, as understood by the OS. # Example return value: -# "/home/kiri/.local/share/godot/app_userdata/GodotJSONRPCTest/_python_dist/20240415/3.12.3" -func _get_cache_path_system() -> String: - return OS.get_user_data_dir().path_join(_get_cache_path_relative()) +# "/home/kiri/.local/share/godot/app_userdata/GodotJSONRPCTest/_python_dist/20240415/3.12.3/packaged_scripts" +func _get_script_cache_path_system() -> String: + return OS.get_user_data_dir().path_join(_get_cache_path_relative()).path_join("packaged_scripts") # Get the full cache path, as understood by Godot. # Example return value: diff --git a/addons/KiriPythonRPCWrapper/KiriPythonRPCWrapper/KiriPacketSocket/KiriPacketSocket.gd b/addons/KiriPythonRPCWrapper/KiriPythonRPCWrapper/KiriPacketSocket/KiriPacketSocket.gd index 473f6fc..2b78e21 100644 --- a/addons/KiriPythonRPCWrapper/KiriPythonRPCWrapper/KiriPacketSocket/KiriPacketSocket.gd +++ b/addons/KiriPythonRPCWrapper/KiriPythonRPCWrapper/KiriPacketSocket/KiriPacketSocket.gd @@ -13,7 +13,6 @@ var _packet_buffer : KiriPacketBuffer = KiriPacketBuffer.new() var _state : KiriSocketState = KiriSocketState.DISCONNECTED var _outgoing_packet_queue : Array = [] -var _state_lock : Mutex = Mutex.new() var _worker_thread : bool = false signal _worker_thread_should_continue @@ -66,47 +65,30 @@ enum KiriSocketState { ERROR = 5 } -func _notification(what): - if what == NOTIFICATION_PREDELETE: - assert(not _worker_thread) - func send_packet(packet_bytes : PackedByteArray): assert(packet_bytes) - _state_lock.lock() _outgoing_packet_queue.append(packet_bytes) - _state_lock.unlock() func poll(): _worker_thread_should_continue.emit() func get_next_packet(): - poll() - - _state_lock.lock() var ret = _packet_buffer.get_next_packet() - _state_lock.unlock() return ret func get_next_server_connection(): - poll() - - _state_lock.lock() var ret = null if len(_new_connections_to_server) > 0: ret = _new_connections_to_server.pop_front() - _state_lock.unlock() return ret func get_last_error(): - _state_lock.lock() var ret = _error_string - _state_lock.unlock() return ret func is_disconnected_or_error(): - _state_lock.lock() var bad_states = [ KiriSocketState.DISCONNECTED, KiriSocketState.ERROR @@ -115,15 +97,11 @@ func is_disconnected_or_error(): var ret : bool = false if _state in bad_states: ret = true - - _state_lock.unlock() return ret func get_state(): - _state_lock.lock() var ret = _state - _state_lock.unlock() return ret func start_server(address): @@ -149,7 +127,9 @@ func start_client(address): func stop(): - assert(_worker_thread) + if not _worker_thread: + return + _should_quit = true while _worker_thread: _worker_thread_should_continue.emit() @@ -167,17 +147,14 @@ func _normal_communication_loop_iteration(sock : StreamPeer, address): return FAILED # Get new data. - _state_lock.lock() var available_bytes = sock.get_available_bytes() if available_bytes > 0: var incoming_bytes = sock.get_data(available_bytes) _packet_buffer.add_bytes(PackedByteArray(incoming_bytes[1])) if incoming_bytes[0] != OK: return FAILED - _state_lock.unlock() # Send all packets from queue. - _state_lock.lock() while len(self._outgoing_packet_queue): var next_outgoing_packet = _outgoing_packet_queue.pop_front() var len_to_send = len(next_outgoing_packet) @@ -186,7 +163,6 @@ func _normal_communication_loop_iteration(sock : StreamPeer, address): sock.put_u8((len_to_send & 0x00ff0000) >> 16) sock.put_u8((len_to_send & 0xff000000) >> 24) sock.put_data(next_outgoing_packet) - _state_lock.unlock() return OK @@ -216,23 +192,22 @@ func _client_thread_func(address): else: _set_state(KiriSocketState.ERROR, "Connection failed") + sock.close() _worker_thread = false func _set_state(state : KiriSocketState, error_string=null): - _state_lock.lock() _state = state - if _state == KiriSocketState.ERROR: assert(error_string) _error_string = error_string else: assert(not error_string) _error_string = "" - - _state_lock.unlock() func _server_to_client_thread_func(connection : StreamPeerTCP, address): + print("_server_to_client_thread_func start") + _set_state(KiriSocketState.CONNECTED) while not _should_quit: @@ -251,7 +226,10 @@ func _server_to_client_thread_func(connection : StreamPeerTCP, address): if get_state() == KiriSocketState.CONNECTED: _set_state(KiriSocketState.DISCONNECTED) + connection.disconnect_from_host() _worker_thread = false + + print("_server_to_client_thread_func stop") func _server_thread_func(address): @@ -279,9 +257,15 @@ func _server_thread_func(address): var connection : StreamPeerTCP = sock.take_connection() var new_client : KiriPacketSocket = KiriPacketSocket.new() new_client._start_client_connection_from_server(connection, address) - _state_lock.lock() _new_connections_to_server.append(new_client) - _state_lock.unlock() + + sock.stop() + sock = null + + # Close all connections that were waiting to be accepted. + for c in _new_connections_to_server: + c.stop() + _new_connections_to_server = [] _worker_thread = false @@ -293,5 +277,11 @@ func _start_client_connection_from_server(connection : StreamPeerTCP, address): # Coroutine call. _server_to_client_thread_func(connection, address) +func _notification(what): + if what == NOTIFICATION_PREDELETE: + # Well, this is horrible. + if self: + if is_running(): + stop() diff --git a/addons/KiriPythonRPCWrapper/KiriPythonWrapperInstance.gd b/addons/KiriPythonRPCWrapper/KiriPythonWrapperInstance.gd index be8636a..2f139e0 100644 --- a/addons/KiriPythonRPCWrapper/KiriPythonWrapperInstance.gd +++ b/addons/KiriPythonRPCWrapper/KiriPythonWrapperInstance.gd @@ -34,6 +34,8 @@ var _build_wrangler : KiriPythonBuildWrangler = null var _external_process_pid = -1 +signal _rpc_async_response_received + func _init(python_file_path : String): _build_wrangler = KiriPythonBuildWrangler.new() @@ -50,7 +52,7 @@ func _get_wrapper_script(): script_dirname + "/KiriPythonRPCWrapper_start.py") func _get_wrapper_cache_path() -> String: - return _build_wrangler._get_cache_path_godot().path_join("KiriPythonRPCWrapper") + return _build_wrangler._get_cache_path_godot().path_join("packaged_scripts") func _get_wrapper_script_cache_path() -> String: return _get_wrapper_cache_path().path_join("addons/KiriPythonRPCWrapper/KiriPythonRPCWrapper/__init__.py") @@ -90,12 +92,22 @@ func get_status(): return KiriPythonWrapperStatus.STATUS_RUNNING -func start_process(): +func start_process(open_terminal : bool = false): # FIXME: Make sure we don't have one running. var open_port = 9500 + # Convert Python script path into a real path on the system. + var real_python_script_path = python_script_path + if real_python_script_path.begins_with("res://"): + real_python_script_path = _build_wrangler._get_script_cache_path_system().path_join( + real_python_script_path.substr(len("res://"))) + else: + real_python_script_path = ProjectSettings.globalize_path( + real_python_script_path) + print("REAL PATH: ", real_python_script_path) + assert(not _server_packet_socket) _server_packet_socket = KiriPacketSocket.new() while true: @@ -114,25 +126,27 @@ func start_process(): _server_packet_socket.stop() open_port += 1 - print("Port: ", open_port) - var python_exe_path : String = _get_python_executable() var wrapper_script_path : String = \ ProjectSettings.globalize_path(_get_wrapper_script_cache_path()) var startup_command : Array = [ - "xterm", "-e", python_exe_path, wrapper_script_path, - "--script", python_script_path, + "--script", real_python_script_path, "--port", open_port] - print("startup command: ", startup_command) + if open_terminal: + if OS.get_name() == "Linux": + startup_command = ["xterm", "-e"] + startup_command + + #print("startup command: ", startup_command) _external_process_pid = OS.create_process( - startup_command[0], startup_command.slice(1), true) + startup_command[0], startup_command.slice(1), + open_terminal) - print("external process: ", _external_process_pid) + #print("external process: ", _external_process_pid) func stop_process(): @@ -149,7 +163,7 @@ func stop_process(): communication_packet_socket.stop() communication_packet_socket = null -func call_rpc_async(method : String, args : Variant, callback = null) -> int: +func call_rpc_callback(method : String, args : Variant, callback = null) -> int: assert((args is Dictionary) or (args is Array)) assert((callback == null) or (callback is Callable)) @@ -165,6 +179,24 @@ func call_rpc_async(method : String, args : Variant, callback = null) -> int: return new_request.id +func call_rpc_async(method : String, args : Variant): + + var request_id = call_rpc_callback(method, args, func(request_ob): + _rpc_async_response_received.emit(request_ob) + ) + + # Wait (block) until we get a response. + while true: + var rpc_response = await _rpc_async_response_received + if not rpc_response: + push_error("Error happened while waiting for RPC response in async call.") + break + + if rpc_response.id == request_id: + return rpc_response.response + + return null + func call_rpc_sync(method : String, args : Variant): # Kinda hacky. We're using arrays because we can change the contents. @@ -173,7 +205,7 @@ func call_rpc_sync(method : String, args : Variant): # ones. var done_array = [false] var response_list = [] - var request_id = call_rpc_async(method, args, func(request_ob): + var request_id = call_rpc_callback(method, args, func(request_ob): done_array[0] = true response_list.append(request_ob.response) ) @@ -184,6 +216,7 @@ func call_rpc_sync(method : String, args : Variant): # Bail out if something happened to our instance or connection to it. if communication_packet_socket: if communication_packet_socket.is_disconnected_or_error(): + push_error("Disconnected from RPC client while waiting for response.") break poll() @@ -205,6 +238,14 @@ func poll(): if communication_packet_socket: + if communication_packet_socket.is_disconnected_or_error(): + # Tell any awaiting async calls that they're never getting an + # answer. So sad. + _rpc_async_response_received.emit(null) + stop_process() + push_error("poll(): Disconnected from RPC client.") + return + # Send all waiting requests for request_id in _active_request_queue: var request : KiriPythonWrapperActiveRequest = _active_request_queue[request_id] @@ -237,6 +278,8 @@ func poll(): _active_request_queue[request_id] if "result" in packet_dict: request.response = packet_dict["result"] + elif "error" in packet_dict: + push_error(packet_dict["error"]) else: request.error_response = "Couldn't find result on packet." if request.callback: diff --git a/addons/KiriPythonRPCWrapper/KiriTARReader.gd b/addons/KiriPythonRPCWrapper/KiriTARReader.gd index 9769015..01f9f7b 100644 --- a/addons/KiriPythonRPCWrapper/KiriTARReader.gd +++ b/addons/KiriPythonRPCWrapper/KiriTARReader.gd @@ -6,7 +6,7 @@ # Godot supports loading files with GZip and Zstandard compression, but only # files that it's saved (with a header/footer), so it can't load normal .tar.gz # or .tar.zst files. It can load zips, though. -# +#tar # DO NOT USE THIS ON UNTRUSTED DATA. extends RefCounted @@ -237,8 +237,10 @@ func open(path: String) -> Error: if merged_paxheader.has("path"): tar_record.filename = merged_paxheader["path"] - print("fixing path for paxheader: ", tar_record.filename) + if merged_paxheader.has("linkpath"): + tar_record.link_destination = merged_paxheader["linkpath"] + # Add it to our record list. _internal_file_list.append(tar_record) diff --git a/addons/KiriPythonRPCWrapper/TODO.md b/addons/KiriPythonRPCWrapper/TODO.md index 2b7d46b..a1bccd5 100644 --- a/addons/KiriPythonRPCWrapper/TODO.md +++ b/addons/KiriPythonRPCWrapper/TODO.md @@ -5,24 +5,16 @@ Done: x remove parent_pid from wrapper script x remove KiriPythonRPCWrapper_start.py x remove test_rpc.py + x Un-thread the GDScript side of PacketSocket. + x Fix whatever this is: + x example Python module from OUTSIDE the addon + x Remove xterm dependency, or make it like a debug-only thing. The big ones: - - Un-thread the GDScript side of PacketSocket. - - - Fix whatever this is: - SCRIPT ERROR: Assertion failed. - at: KiriPacketSocket._notification (res://addons/KiriPythonRPCWrapper/KiriPacketSocket/KiriPacketSocket.gd:70) -WARNING: A Thread object is being destroyed without its completion having been realized. -Please call wait_to_finish() on it to ensure correct cleanup. - at: ~Thread (core/os/thread.cpp:102) - - First-time setup of requirements (pip, etc). - - Remove xterm dependency, or make it like a debug-only thing. - Test on WINE/Windows. - Documentation. - how to use .kiri_export_python - - example Python module from OUTSIDE the addon -