Samuel Williams Wednesday, 23 January 2019

Ruby has several existing HTTP clients and servers, but none have support for both HTTP/1 and HTTP/2 streaming of request and response bodies. Native support for HTTP/2 is important especially when interacting with remote systems, as the per-request overhead is greatly reduced and it can be used for long-running connections. We present async-http, a modern, streaming HTTP client and server supporting HTTP/1, HTTP/2 and SSL out of the box.

Requests and Responses

Async::HTTP provides abstractions over HTTP/1 and HTTP/2 to minimise the complexity of your code. There is a beautiful symmetry: clients send requests and receive responses. Servers receive requests and send responses. This is reflected in the code.

#!/usr/bin/env ruby

require 'async'
require 'async/http/url_endpoint'
require 'async/http/server'
require 'async/http/client'

Async do
	endpoint = Async::HTTP::URLEndpoint.parse("http://localhost:8080")
	
	# Run the server in a separate fiber:
	server_task = Async do
		server = Async::HTTP::Server.for(endpoint) do |request|
			# Receive the request:
			Async::HTTP::Response[200,
				{'content-type' => 'text/plain'},
				["Hello World!"]
			]
		end
		
		server.run
	end
	
	# Create a client:
	client = Async::HTTP::Client.new(endpoint)
	
	# Make the request (it gets wrapped into Async::HTTP::Request):
	response = client.get("/")
	
	# Print out the response:
	puts "Status: #{response.status} (#{response.success?})"
	puts "Headers: #{response.headers.to_h}"
	puts "Body: #{response.read.inspect}"
ensure
	# Stop the server:
	server_task.stop if server_task
	
	# Close any persistent connections:
	client.close if client
end

Of particular interest is request and response bodies. These are typically an array of strings (chunks). To enable streaming you must uses a queue of chunks.

Streaming

HTTP allows streaming of both request and response bodies, even at the same time. This allows you to incrementally generate and consume information, which minimises latency and memory usage. Async::HTTP::Body contains the implementations required to stream both request and response bodies, and its usage is very straight forward.

Streaming Response Body

Perhaps the most basic use case, incrementally generating information and streaming it back to the client. Incrementally streaming events or records, formatted as newline delimited JSON, is a typical use case.

#!/usr/bin/env ruby

require 'async'
require 'async/http/url_endpoint'
require 'async/http/server'
require 'async/http/client'

# Show what's going on under the hood:
# Async.logger.debug!

require 'json'
APPLICATION_JSON_STREAM = "application/json; boundary=NL".freeze

Async do
	endpoint = Async::HTTP::URLEndpoint.parse("http://localhost:8080", timeout: 5)
	
	# Run the server in a separate fiber:
	server_task = Async do
		server = Async::HTTP::Server.for(endpoint) do |request|
			body = Async::HTTP::Body::Writable.new
			
			Async do |task|
				while true
					duration = rand
					task.sleep duration
					body.write(JSON.dump(["Hello World!", "I slept for #{duration}"]) + "\n")
				end
			end
			
			# Receive the request and respond with the dynamic body:
			Async::HTTP::Response[200,
				{'content-type' => APPLICATION_JSON_STREAM},
				# We echo the body back to the client:
				body
			]
		end
		
		server.run
	end
	
	# Create a client:
	client = Async::HTTP::Client.new(endpoint)
	
	response = client.get("/")
	
	response.each do |chunk|
		# As the server generates chunks we will receive them.
		# While Async::HTTP guarantees to preserve the chunk boundaries with HTTP/1.1 and HTTP/2, you might prefer to do additional buffering here to improve robustness.
		puts JSON.parse(chunk).inspect
	end
ensure
	# Stop the server:
	server_task.stop if server_task
	
	# Close any persistent connections:
	client.close if client
end

Streaming Request Bodies

Some services accept streaming bodies, e.g. upload services, incremental processing (voice recognition, video processing), and so on. While we are uploading the content, it's possible the remote server would send us back information, e.g. the current transcript, objects recognised, etc. Here is a simple streaming upper case service:

#!/usr/bin/env ruby

require 'async'
require 'async/http/url_endpoint'
require 'async/http/server'
require 'async/http/client'

# Show what's going on under the hood:
# Async.logger.debug!

require 'json'
APPLICATION_JSON_STREAM = "application/json; boundary=NL".freeze

Async do
	endpoint = Async::HTTP::URLEndpoint.parse("http://localhost:8080", timeout: 5)
	
	# Run the server in a separate fiber:
	server_task = Async do
		server = Async::HTTP::Server.for(endpoint) do |request|
			body = Async::HTTP::Body::Writable.new
			
			Async do
				request.each do |chunk|
					body.write chunk.upcase
				end
				
			ensure
				body.close
			end
			
			# Receive the request:
			Async::HTTP::Response[200,
				{'content-type' => request.headers['content-type']},
				# We echo the body back to the client:
				body
			]
		end
		
		server.run
	end
	
	# Create a client:
	client = Async::HTTP::Client.new(endpoint)
	
	# Make the request (it gets wrapped into Async::HTTP::Request):
	body = Async::HTTP::Body::Writable.new
	
	response = client.post("/", {'content-type' => 'text/plain'}, body)
	
	5.times do
		body.write("ping")
		puts response.body.read.inspect
	end
	
	body.close
	response.finish
ensure
	# Stop the server:
	server_task.stop if server_task
	
	# Close any persistent connections:
	client.close if client
end

Echo Server

Because of the symmetry of the implementation, fun services like this become possible:

#!/usr/bin/env ruby

require 'async'
require 'async/http/url_endpoint'
require 'async/http/server'
require 'async/http/client'

# Show what's going on under the hood:
# Async.logger.debug!

Async do
	endpoint = Async::HTTP::URLEndpoint.parse("http://localhost:8080", timeout: 5)
	
	# Run the server in a separate fiber:
	server_task = Async do
		server = Async::HTTP::Server.for(endpoint) do |request|
			# Receive the request:
			Async::HTTP::Response[200,
				{'content-type' => 'text/plain'},
				# We echo the body back to the client:
				request.body
			]
		end
		
		server.run
	end
	
	# Create a client:
	client = Async::HTTP::Client.new(endpoint)
	
	# Make the request (it gets wrapped into Async::HTTP::Request):
	body = Async::HTTP::Body::Writable.new
	
	response = client.post("/", {}, body)
	
	5.times do
		body.write(`fortune`)
		puts "*~" * 40
		puts response.body.read
	end
	
	body.close
	response.finish
ensure
	# Stop the server:
	server_task.stop if server_task
	
	# Close any persistent connections:
	client.close if client
end

99 bottles of beer on the wall

A reasonable question would be - does this work in production? If you use falcon, the answer is YES. The implementation even works with Rack.

Conclusion

Streaming request and response bodies minimise latency and open up new opportunities for interactivity. We have shown that async-http has a great model for both uni-directional and bi-directional streaming which works on both the client and server side. Such systems can be deployed into production using falcon.

Comments

Leave a comment

Please note, comments must be formatted using Markdown. Links can be enclosed in angle brackets, e.g. <www.codeotaku.com>.