Skip to content

Move Lazy resolution into Dataloader #5314

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,18 @@ jobs:
ruby: 3.3
- gemfile: gemfiles/rails_master.gemfile
ruby: 3.3
graphql_reject_numbers_followed_by_names: 1
graphql_future: 1
redis: 1
- gemfile: gemfiles/rails_master.gemfile
ruby: 3.4
graphql_reject_numbers_followed_by_names: 1
graphql_future: 1
isolation_level_fiber: 1
redis: 1
runs-on: ubuntu-latest
steps:
- run: echo BUNDLE_GEMFILE=${{ matrix.gemfile }} > $GITHUB_ENV
- run: echo GRAPHQL_REJECT_NUMBERS_FOLLOWED_BY_NAMES=1 > $GITHUB_ENV
if: ${{ !!matrix.graphql_reject_numbers_followed_by_names }}
- run: echo GRAPHQL_FUTURE=1 > $GITHUB_ENV
if: ${{ !!matrix.graphql_future }}
- run: echo ISOLATION_LEVEL_FIBER=1 > $GITHUB_ENV
if: ${{ !!matrix.isolation_level_fiber }}
- uses: shogo82148/actions-setup-redis@v1
Expand Down
4 changes: 4 additions & 0 deletions lib/graphql/analysis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ def analyze_multiplex(multiplex, analyzers)
# @param analyzers [Array<GraphQL::Analysis::Analyzer>]
# @return [Array<Any>] Results from those analyzers
def analyze_query(query, analyzers, multiplex_analyzers: [])
# If called outside of execution:
if query.context.runtime.nil?
query.init_runtime(lazies_at_depth: nil)
end
query.current_trace.analyze_query(query: query) do
query_analyzers = analyzers
.map { |analyzer| analyzer.new(query) }
Expand Down
57 changes: 47 additions & 10 deletions lib/graphql/dataloader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
require "graphql/dataloader/source"
require "graphql/dataloader/active_record_association_source"
require "graphql/dataloader/active_record_source"
require "graphql/dataloader/lazy_source"

module GraphQL
# This plugin supports Fiber-based concurrency, along with {GraphQL::Dataloader::Source}.
Expand All @@ -26,10 +27,10 @@ module GraphQL
#
class Dataloader
class << self
attr_accessor :default_nonblocking, :default_fiber_limit
attr_accessor :default_nonblocking, :default_fiber_limit, :default_lazy_compat_mode
end

def self.use(schema, nonblocking: nil, fiber_limit: nil)
def self.use(schema, nonblocking: nil, fiber_limit: nil, lazy_compat_mode: false)
dataloader_class = if nonblocking
warn("`nonblocking: true` is deprecated from `GraphQL::Dataloader`, please use `GraphQL::Dataloader::AsyncDataloader` instead. Docs: https://graphql-ruby.org/dataloader/async_dataloader.")
Class.new(self) { self.default_nonblocking = true }
Expand All @@ -42,6 +43,13 @@ def self.use(schema, nonblocking: nil, fiber_limit: nil)
dataloader_class.default_fiber_limit = fiber_limit
end

if lazy_compat_mode
if dataloader_class == self
dataloader_class = Class.new(dataloader_class)
end
dataloader_class.default_lazy_compat_mode = lazy_compat_mode
end

schema.dataloader_class = dataloader_class
end

Expand All @@ -57,18 +65,22 @@ def self.with_dataloading(&block)
result
end

def initialize(nonblocking: self.class.default_nonblocking, fiber_limit: self.class.default_fiber_limit)
def initialize(nonblocking: self.class.default_nonblocking, fiber_limit: self.class.default_fiber_limit, lazy_compat_mode: self.class.default_lazy_compat_mode)
@source_cache = Hash.new { |h, k| h[k] = {} }
@pending_jobs = []
if !nonblocking.nil?
@nonblocking = nonblocking
end
@fiber_limit = fiber_limit
@running_jobs = false
@lazy_compat_mode = lazy_compat_mode
end

# @return [Integer, nil]
attr_reader :fiber_limit

attr_reader :source_cache

def nonblocking?
@nonblocking
end
Expand Down Expand Up @@ -141,9 +153,17 @@ def yield(source = Fiber[:__graphql_current_dataloader_source])

# @api private Nothing to see here
def append_job(&job)
# Given a block, queue it up to be worked through when `#run` is called.
# (If the dataloader is already running, than a Fiber will pick this up later.)
@pending_jobs.push(job)
# This is to match GraphQL-Batch-type execution.
# In that approach, a field's children would be resolved right after the parent.
# But the default dataloader approach is to run siblings, then "cousin" fields -- each depth in a wave.
# This option restores the flow of lazy_resolve.
if @lazy_compat_mode && @running_jobs
job.call
else
# Given a block, queue it up to be worked through when `#run` is called.
# (If the dataloader is already running, than a Fiber will pick this up later.)
@pending_jobs.push(job)
end
nil
end

Expand All @@ -160,6 +180,7 @@ def clear_cache
def run_isolated
prev_queue = @pending_jobs
prev_pending_keys = {}
prev_running_jobs = @running_jobs
@source_cache.each do |source_class, batched_sources|
batched_sources.each do |batch_args, batched_source_instance|
if batched_source_instance.pending?
Expand All @@ -170,6 +191,7 @@ def run_isolated
end

@pending_jobs = []
@running_jobs = false
res = nil
# Make sure the block is inside a Fiber, so it can `Fiber.yield`
append_job {
Expand All @@ -179,6 +201,7 @@ def run_isolated
res
ensure
@pending_jobs = prev_queue
@running_jobs = prev_running_jobs
prev_pending_keys.each do |source_instance, pending|
pending.each do |key, value|
if !source_instance.results.key?(key)
Expand All @@ -201,6 +224,7 @@ def run
while first_pass || !job_fibers.empty?
first_pass = false

@running_jobs = true
while (f = (job_fibers.shift || (((next_job_fibers.size + job_fibers.size) < jobs_fiber_limit) && spawn_job_fiber(trace))))
if f.alive?
finished = run_fiber(f)
Expand All @@ -209,10 +233,21 @@ def run
end
end
end
@running_jobs = false
join_queues(job_fibers, next_job_fibers)

while (!source_fibers.empty? || @source_cache.each_value.any? { |group_sources| group_sources.each_value.any?(&:pending?) })
while (f = source_fibers.shift || (((job_fibers.size + source_fibers.size + next_source_fibers.size + next_job_fibers.size) < total_fiber_limit) && spawn_source_fiber(trace)))
defer_sources = nil
@source_cache.each_value do |group_sources|
group_sources.each_value do |source_inst|
if source_inst.defer?
defer_sources ||= []
defer_sources << source_inst
end
end
end

while (!source_fibers.empty? || @source_cache.each_value.any? { |group_sources| group_sources.each_value.any? { |s| s.pending? && (defer_sources ? !defer_sources.include?(s) : true) } })
while (f = source_fibers.shift || (((job_fibers.size + source_fibers.size + next_source_fibers.size + next_job_fibers.size) < total_fiber_limit) && spawn_source_fiber(trace, defer_sources)))
if f.alive?
finished = run_fiber(f)
if !finished
Expand Down Expand Up @@ -242,6 +277,8 @@ def run

rescue UncaughtThrowError => e
throw e.tag, e.value
ensure
@running_jobs = false
end

def run_fiber(f)
Expand Down Expand Up @@ -304,11 +341,11 @@ def spawn_job_fiber(trace)
end
end

def spawn_source_fiber(trace)
def spawn_source_fiber(trace, defer_sources)
pending_sources = nil
@source_cache.each_value do |source_by_batch_params|
source_by_batch_params.each_value do |source|
if source.pending?
if source.pending? && !defer_sources&.include?(source)
pending_sources ||= []
pending_sources << source
end
Expand Down
27 changes: 27 additions & 0 deletions lib/graphql/dataloader/lazy_source.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# frozen_string_literal: true
require "graphql/dataloader/source"

module GraphQL
class Dataloader
class LazySource < GraphQL::Dataloader::Source
def initialize(phase, context)
@context = context
@phase = phase
end

def fetch(lazies)
lazies.map do |l|
@context.schema.sync_lazy(l)
rescue StandardError => err
err
end
end

attr_reader :phase

def defer?
@phase == :field_resolve && dataloader.source_cache[self.class].any? { |k, v| v.phase == :object_wrap && v.pending? }
end
end
end
end
4 changes: 4 additions & 0 deletions lib/graphql/dataloader/source.rb
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ def load_all(values)
result_keys.map { |k| result_for(k) }
end

def defer?
false
end

# Subclasses must implement this method to return a value for each of `keys`
# @param keys [Array<Object>] keys passed to {#load}, {#load_all}, {#request}, or {#request_all}
# @return [Array<Object>] A loaded value for each of `keys`. The array must match one-for-one to the list of `keys`.
Expand Down
13 changes: 4 additions & 9 deletions lib/graphql/execution/interpreter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class << self
# @param max_complexity [Integer, nil]
# @return [Array<GraphQL::Query::Result>] One result per query
def run_all(schema, query_options, context: {}, max_complexity: schema.max_complexity)
lazies_at_depth = Hash.new { |h, k| h[k] = [] }
queries = query_options.map do |opts|
query = case opts
when Hash
Expand All @@ -40,9 +41,9 @@ def run_all(schema, query_options, context: {}, max_complexity: schema.max_compl
trace = multiplex.current_trace
Fiber[:__graphql_current_multiplex] = multiplex
trace.execute_multiplex(multiplex: multiplex) do
schema = multiplex.schema
queries = multiplex.queries
lazies_at_depth = Hash.new { |h, k| h[k] = [] }
queries.each { |query| query.init_runtime(lazies_at_depth: lazies_at_depth) }
schema = multiplex.schema
multiplex_analyzers = schema.multiplex_analyzers
if multiplex.max_complexity
multiplex_analyzers += [GraphQL::Analysis::MaxQueryComplexity]
Expand Down Expand Up @@ -70,14 +71,8 @@ def run_all(schema, query_options, context: {}, max_complexity: schema.max_compl
NO_OPERATION
else
begin
# Although queries in a multiplex _share_ an Interpreter instance,
# they also have another item of state, which is private to that query
# in particular, assign it here:
runtime = Runtime.new(query: query, lazies_at_depth: lazies_at_depth)
query.context.namespace(:interpreter_runtime)[:runtime] = runtime

query.current_trace.execute_query(query: query) do
runtime.run_eager
query.context.runtime.run_eager
end
rescue GraphQL::ExecutionError => err
query.context.errors << err
Expand Down
20 changes: 12 additions & 8 deletions lib/graphql/execution/interpreter/runtime.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def current_object
def initialize(query:, lazies_at_depth:)
@query = query
@current_trace = query.current_trace
@dataloader = query.multiplex.dataloader
@dataloader = query.context.dataloader
@lazies_at_depth = lazies_at_depth
@schema = query.schema
@context = query.context
Expand All @@ -54,6 +54,7 @@ def initialize(query:, lazies_at_depth:)
end
# { Class => Boolean }
@lazy_cache = {}.compare_by_identity
@default_lazy_legacy = @schema.legacy_sync_lazy
end

def final_result
Expand Down Expand Up @@ -876,7 +877,6 @@ def resolve_type(type, value)
query.resolve_type(type, value)
end
@current_trace.end_resolve_type(type, value, context, resolved_type)

if lazy?(resolved_type)
GraphQL::Execution::Lazy.new do
@current_trace.begin_resolve_type(type, value, context)
Expand All @@ -891,13 +891,17 @@ def resolve_type(type, value)
end
end

def lazy?(object)
obj_class = object.class
is_lazy = @lazy_cache[obj_class]
if is_lazy.nil?
is_lazy = @lazy_cache[obj_class] = @schema.lazy?(object)
def lazy?(object, legacy: @default_lazy_legacy)
if legacy
obj_class = object.class
is_lazy = @lazy_cache[obj_class]
if is_lazy.nil?
is_lazy = @lazy_cache[obj_class] = @schema.lazy?(object, legacy: true)
end
is_lazy
else
false
end
is_lazy
end
end
end
Expand Down
35 changes: 35 additions & 0 deletions lib/graphql/execution/lazy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,41 @@ module Execution
# - It has no error-catching functionality
# @api private
class Lazy
module FieldIntegration
def resolve(_obj, _args, ctx)
result = super
if ctx.runtime.lazy?(result, legacy: true)
ctx.dataloader.with(GraphQL::Dataloader::LazySource, :field_resolve, ctx).load(result)
else
result
end
end
end

module ObjectIntegration
def self.included(child_class)
child_class.extend(ClassMethods)
child_class.singleton_class.prepend(Authorized)
end

module ClassMethods
def inherited(child_class)
child_class.singleton_class.prepend(Authorized)
super
end
end

module Authorized
def authorized?(obj, ctx)
result = super
if ctx.runtime.lazy?(result, legacy: true)
ctx.dataloader.with(GraphQL::Dataloader::LazySource, :object_wrap, ctx).load(result)
else
result
end
end
end
end
attr_reader :field

# Create a {Lazy} which will get its inner value by calling the block
Expand Down
5 changes: 5 additions & 0 deletions lib/graphql/query.rb
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,11 @@ def after_lazy(value, &block)

attr_reader :logger

def init_runtime(lazies_at_depth:)
runtime = Execution::Interpreter::Runtime.new(query: self, lazies_at_depth: lazies_at_depth)
context.namespace(:interpreter_runtime)[:runtime] = runtime
end

private

def find_operation(operations, operation_name)
Expand Down
5 changes: 5 additions & 0 deletions lib/graphql/query/context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,11 @@ def scoped
Scoped.new(@scoped_context, current_path)
end

# @api private
def runtime
@runtime ||= namespace(:interpreter_runtime)[:runtime]
end

class Scoped
def initialize(scoped_context, path)
@path = path
Expand Down
Loading
Loading