Ruby has several existing HTTP clients and servers, but none have support for both HTTP/1 and HTTP/2 streaming of request and response bodies. Native support for HTTP/2 is important especially when interacting with remote systems, as the per-request overhead is greatly reduced and it can be used for long-running connections. We present async-http, a modern, streaming HTTP client and server supporting HTTP/1, HTTP/2 and SSL out of the box.
Requests and Responses
Async::HTTP
provides abstractions over HTTP/1 and HTTP/2 to minimise the complexity of your code. There is a beautiful symmetry: clients send requests and receive responses. Servers receive requests and send responses. This is reflected in the code.
#!/usr/bin/env ruby
require 'async'
require 'async/http/url_endpoint'
require 'async/http/server'
require 'async/http/client'
Async do
endpoint = Async::HTTP::URLEndpoint.parse("http://localhost:8080")
# Run the server in a separate fiber:
server_task = Async do
server = Async::HTTP::Server.for(endpoint) do |request|
# Receive the request:
Async::HTTP::Response[200,
{'content-type' => 'text/plain'},
["Hello World!"]
]
end
server.run
end
# Create a client:
client = Async::HTTP::Client.new(endpoint)
# Make the request (it gets wrapped into Async::HTTP::Request):
response = client.get("/")
# Print out the response:
puts "Status: #{response.status} (#{response.success?})"
puts "Headers: #{response.headers.to_h}"
puts "Body: #{response.read.inspect}"
ensure
# Stop the server:
server_task.stop if server_task
# Close any persistent connections:
client.close if client
end
Of particular interest is request and response bodies. These are typically an array of strings (chunks). To enable streaming you must uses a queue of chunks.
Streaming
HTTP allows streaming of both request and response bodies, even at the same time. This allows you to incrementally generate and consume information, which minimises latency and memory usage. Async::HTTP::Body
contains the implementations required to stream both request and response bodies, and its usage is very straight forward.
Streaming Response Body
Perhaps the most basic use case, incrementally generating information and streaming it back to the client. Incrementally streaming events or records, formatted as newline delimited JSON, is a typical use case.
#!/usr/bin/env ruby
require 'async'
require 'async/http/url_endpoint'
require 'async/http/server'
require 'async/http/client'
# Show what's going on under the hood:
# Async.logger.debug!
require 'json'
APPLICATION_JSON_STREAM = "application/json; boundary=NL".freeze
Async do
endpoint = Async::HTTP::URLEndpoint.parse("http://localhost:8080", timeout: 5)
# Run the server in a separate fiber:
server_task = Async do
server = Async::HTTP::Server.for(endpoint) do |request|
body = Async::HTTP::Body::Writable.new
Async do |task|
while true
duration = rand
task.sleep duration
body.write(JSON.dump(["Hello World!", "I slept for #{duration}"]) + "\n")
end
end
# Receive the request and respond with the dynamic body:
Async::HTTP::Response[200,
{'content-type' => APPLICATION_JSON_STREAM},
# We echo the body back to the client:
body
]
end
server.run
end
# Create a client:
client = Async::HTTP::Client.new(endpoint)
response = client.get("/")
response.each do |chunk|
# As the server generates chunks we will receive them.
# While Async::HTTP guarantees to preserve the chunk boundaries with HTTP/1.1 and HTTP/2, you might prefer to do additional buffering here to improve robustness.
puts JSON.parse(chunk).inspect
end
ensure
# Stop the server:
server_task.stop if server_task
# Close any persistent connections:
client.close if client
end
Streaming Request Bodies
Some services accept streaming bodies, e.g. upload services, incremental processing (voice recognition, video processing), and so on. While we are uploading the content, it's possible the remote server would send us back information, e.g. the current transcript, objects recognised, etc. Here is a simple streaming upper case service:
#!/usr/bin/env ruby
require 'async'
require 'async/http/url_endpoint'
require 'async/http/server'
require 'async/http/client'
# Show what's going on under the hood:
# Async.logger.debug!
require 'json'
APPLICATION_JSON_STREAM = "application/json; boundary=NL".freeze
Async do
endpoint = Async::HTTP::URLEndpoint.parse("http://localhost:8080", timeout: 5)
# Run the server in a separate fiber:
server_task = Async do
server = Async::HTTP::Server.for(endpoint) do |request|
body = Async::HTTP::Body::Writable.new
Async do
request.each do |chunk|
body.write chunk.upcase
end
ensure
body.close
end
# Receive the request:
Async::HTTP::Response[200,
{'content-type' => request.headers['content-type']},
# We echo the body back to the client:
body
]
end
server.run
end
# Create a client:
client = Async::HTTP::Client.new(endpoint)
# Make the request (it gets wrapped into Async::HTTP::Request):
body = Async::HTTP::Body::Writable.new
response = client.post("/", {'content-type' => 'text/plain'}, body)
5.times do
body.write("ping")
puts response.body.read.inspect
end
body.close
response.finish
ensure
# Stop the server:
server_task.stop if server_task
# Close any persistent connections:
client.close if client
end
Echo Server
Because of the symmetry of the implementation, fun services like this become possible:
#!/usr/bin/env ruby
require 'async'
require 'async/http/url_endpoint'
require 'async/http/server'
require 'async/http/client'
# Show what's going on under the hood:
# Async.logger.debug!
Async do
endpoint = Async::HTTP::URLEndpoint.parse("http://localhost:8080", timeout: 5)
# Run the server in a separate fiber:
server_task = Async do
server = Async::HTTP::Server.for(endpoint) do |request|
# Receive the request:
Async::HTTP::Response[200,
{'content-type' => 'text/plain'},
# We echo the body back to the client:
request.body
]
end
server.run
end
# Create a client:
client = Async::HTTP::Client.new(endpoint)
# Make the request (it gets wrapped into Async::HTTP::Request):
body = Async::HTTP::Body::Writable.new
response = client.post("/", {}, body)
5.times do
body.write(`fortune`)
puts "*~" * 40
puts response.body.read
end
body.close
response.finish
ensure
# Stop the server:
server_task.stop if server_task
# Close any persistent connections:
client.close if client
end
99 bottles of beer on the wall
A reasonable question would be - does this work in production? If you use falcon, the answer is YES. The implementation even works with Rack.
Conclusion
Streaming request and response bodies minimise latency and open up new opportunities for interactivity. We have shown that async-http has a great model for both uni-directional and bi-directional streaming which works on both the client and server side. Such systems can be deployed into production using falcon.