-
Notifications
You must be signed in to change notification settings - Fork 11.6k
[12.x] Extend SQS FIFO and fair queue support #57187
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: 12.x
Are you sure you want to change the base?
[12.x] Extend SQS FIFO and fair queue support #57187
Conversation
…e deduplication id.
… queued mailable job.
… the queued notification job.
…to the queued listener job.
…plicator properties to the queued listener job.
Thanks for submitting a PR! Note that draft PR's are not reviewed. If you would like a review, please mark your pull request as ready for review in the GitHub user interface. Pull requests that are abandoned in draft may be closed due to inactivity. |
I added a phpstan-ignore comment to resolve the PHPStan false positive. There are a few other places in the framework that have used this, so I figured it would be okay. I submitted an issue to PHPStan for this, so it can be removed if the issue is resolved there (phpstan/phpstan#13596). The other option is to change the first callable syntax to use |
Is there an issue having a closure property on the Queueable trait when that job gets serialized for the queue? |
@taylorotwell , yes, it looks like there is. This issue is hidden from the tests because the calls to This is probably one of the reasons I went with registering the deduplication functionality in the container in my package. The package provides predefined deduplicators, as well as the ability to create custom deduplicators. But they are registered with the container, and the only thing stored on the job is the string to use to pull the deduplicator from the container. I just don't know how much you want to "dirty" up Laravel to support the SQS FIFO functionality. We can also look into seeing if we can leverage the Thoughts? |
@patrickcarlohickman yeah we may need to have some sort of Queue::deduplicateUsing(fn) method to be honest. |
For now, can we just remove the |
This is a follow up to the previous PR #57080 that fixed SQS FIFO and fair queues for user defined queued jobs. This PR adds SQS FIFO and fair queue support to internally queued jobs (mailables, notifications, event listeners).
Summary
SQS FIFO and fair queues work for user defined queued jobs now. However, there are other components that can be queued that will still throw an error. For example, queueing mailables, notifications, or event listeners will throw an error. When attempting to queue one of these components, it is wrapped by an internal job that is sent to the queue, but the message group and deduplication logic is not propagated to this new internal job.
SendQueuedMailable
job.SendQueuedNotifications
job.CallQueuedListener
job.QueuedClosure
object that queues aCallQueuedListener
job.Updates
Enhanced the existing
deduplicationId()
method by passing in the job payload and queue name.This data can be useful to the method in generating the deduplication id. For example, when content deduplication is enabled on the queue in AWS, AWS generates the deduplication id using a sha256 hash of the payload. If content deduplication is disabled in AWS and the developer would like to emulate this behavior, the payload is now available to the
deduplicationId()
method.Added a new
$deduplicator
property andwithDeduplicator()
method to theQueueable
trait.Currently, the queue looks for the
deduplicationId()
method on the queued job to generate the deduplication id. However, this method cannot be propagated to internal jobs. For example, when queueing aMailable
object, one may define thededuplicationId()
method on the mailable. However, since the mailable is not the object that is actually queued, the method won't be found or called. We need a way to propagate this logic of this method to the internal queued job.The
$deduplicator
property can now hold a callback that is to be used to generate the deduplication id. If set, it will take precedence over thededuplicationId()
method if one is also defined. The callback will take the payload and queue name as parameters.Update the
Mailable
to forward the message group and deduplication logic to the internalSendQueuedMailable
job.When the
Mailable
is queued, it gets wrapped in an instance of aSendQueuedMailable
job object that is actually sent to the queue. The job creation logic has been updated to forward any message group id or deduplication logic defined on the mailable to the internal queued job.If the mailable has a
$deduplicator
callback defined, it will forward that. Otherwise, if the mailable has adeduplicationId()
method defined, it will convert that to a closure and forward that.Update the
NotificationSender
to forward the message group and deduplication logic to the internalSendQueuedNotifications
job.When a
Notification
is queued, it gets wrapped in an instance of aSendQueuedNotifications
job object that is actually sent to the queue. The job creation logic has been updated to forward any message group id or deduplication logic defined on the notification to the internal queued job.An extra part to this, however, is that logic may differ based on the notification channel. To support per-channel logic, new support for a
withMessageGroups()
method and awithDeduplicators()
method was added. The methods act the same as theviaConnections()
,viaQueues()
, andwithDelay()
method support.Update the event
Dispatcher
to forward the message group and deduplication logic to the internalCallQueuedListener
job.When an event listener is queued, it gets wrapped in an instance of a
CallQueuedListener
job object that is actually sent to the queue. The job creation logic has been updated to forward any message group id or deduplication logic defined on the listener to the internal queued job.An extra part to this, however, is that logic may differ based on the event being handled. To support per-event logic, new support for a
messageGroup($event)
method and adeduplicator($event)
method was added. The methods act the same as the other queue related methods (backoff()
,retryUntil()
,tries()
, etc.).For the message group id, if the
messageGroup()
method exists, it takes precedence. Otherwise, it falls back to the$messageGroup
property.For the deduplication id, if the
deduplicator()
method exists, it takes precedence. Otherwise, it falls back to converting thededuplicationId()
method on the listener to a callback and forwarding that, if it exists.Note, there is no attempt to forward the
$deduplicator
property from the listener to the queued job. This is on purpose. The dispatcher creates a new instance of the listener using reflection and without calling the constructor. Since a closure cannot be used as a default value for a class property (at least not until PHP 8.5), and the constructor will not be called, there is currently no way for the$deduplicator
property to be set to a closure on the listener object being used to propagate data to the queued job.Update the event
QueuedClosure
to support message groups and deduplication and forward it to the internalCallQueuedListener
job.Queued anonymous event listeners get converted to a
QueuedClosure
object, which in turn creates an instance of aCallQueuedListener
job object that is actually sent to the queue. TheQueuedClosure
object has been updated to support setting the$messageGroup
and$deduplicator
properties, and forwarding those properties to the internal queued job.Notes
I have added tests that highlight the issues and show they've been fixed.
Job batching and job chaining seems to work as long as each batched/chained job defines the message group and deduplicator. We may want to consider adding logic that allows defining a message group and deduplicator to apply to all jobs in the batch/chain. If desired, this can be done in another PR.
With method naming, I tried to be consistent with the existing code within the component. There may be room improving consistency across components, but that would be a bigger job and not within the scope of this PR.
I think the
onGroup()
method should be renamed toonMessageGroup()
across all components. Especially since the original property was called$group
and was later renamed to$messageGroup
. However, this would be a breaking change and something for another PR.The commits in the PR line up with the updates described above, starting with the second commit. The first commit was just adding an extra test that was missed before, and is not addressed in the update section.
Please let me know if you need me to make any changes or have any questions.