Commit 84d544b

mo khan <mo@mokhan.ca>
2025-08-28 16:55:04
fix: parse SSE messages from OpenAI completions API
1 parent a6fef70
Changed files (2)
lib
lib/elelem/states/working.rb
@@ -8,10 +8,7 @@ module Elelem
           state = Waiting.new(agent)
 
           loop do
-            agent.api.chat(agent.conversation.history) do |chunk|
-              response = JSON.parse(chunk)
-              message = normalize(response["message"] || {})
-
+            agent.api.chat(agent.conversation.history) do |message|
               agent.logger.debug("#{state.display_name}: #{message}")
               state = state.run(message)
             end
@@ -22,10 +19,6 @@ module Elelem
 
           agent.transition_to(States::Idle.new)
         end
-
-        def normalize(message)
-          message.reject { |_key, value| value.empty? }
-        end
       end
     end
   end
lib/elelem/api.rb
@@ -12,18 +12,53 @@ module Elelem
     def chat(messages, &block)
       Net::HTTP.start(uri.hostname, uri.port, http_options) do |http|
         http.request(build_request(messages)) do |response|
-          if response.is_a?(Net::HTTPSuccess)
-            response.read_body(&block)
-          else
+          if !response.is_a?(Net::HTTPSuccess)
             configuration.logger.error(response.inspect)
             raise response.inspect
           end
+
+          buffer = ""
+          response.read_body do |chunk|
+            configuration.logger.debug(chunk)
+            buffer += chunk
+
+            while (message = extract_sse_message(buffer))
+              next if message.empty? || message == "[DONE]"
+
+              configuration.logger.debug(message)
+              json = JSON.parse(message)
+              block.call(normalize(json.dig("choices", 0, "delta")))
+            end
+          end
         end
       end
     end
 
     private
 
+    def extract_sse_message(buffer)
+      message_end = buffer.index("\n\n")
+      return nil unless message_end
+
+      message_data = buffer[0...message_end]
+      buffer.replace(buffer[(message_end + 2)..-1] || "")
+
+      data_lines = message_data.split("\n").filter_map do |line|
+        if line.start_with?("data: ")
+          line[6..-1]
+        elsif line == "data:"
+          ""
+        end
+      end
+
+      return "" if data_lines.empty?
+      data_lines.join("\n")
+    end
+
+    def normalize(message)
+      message.reject { |_key, value| value.empty? }
+    end
+
     def http_options
       {
         open_timeout: 10,
@@ -43,11 +78,10 @@ module Elelem
         host = scheme + raw_host
       end
 
-      URI("#{host.sub(%r{/?$}, "")}/api/chat")
+      URI("#{host.sub(%r{/?$}, "")}/v1/chat/completions")
     end
 
     def build_request(messages)
-      timestamp = Time.now.to_i
       Net::HTTP::Post.new(uri).tap do |request|
         request["Accept"] = "application/json"
         request["Content-Type"] = "application/json"
@@ -61,8 +95,7 @@ module Elelem
         messages: messages,
         model: configuration.model,
         stream: true,
-        keep_alive: "5m",
-        options: { temperature: 0.1 },
+        temperature: 0.1,
         tools: configuration.tools.to_h
       }.tap do |payload|
         configuration.logger.debug(JSON.pretty_generate(payload))