Skip to content

feat: added interval config option to tell the plugin to run periodically #37

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ This plugin supports the following configuration options plus the <<plugins-{typ
| <<plugins-{type}s-{plugin}-api_version>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-client_id>> |<<string,string>>|Yes
| <<plugins-{type}s-{plugin}-client_secret>> |<<password,password>>|Yes
| <<plugins-{type}s-{plugin}-interval>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-password>> |<<password,password>>|Yes
| <<plugins-{type}s-{plugin}-security_token>> |<<password,password>>|Yes
| <<plugins-{type}s-{plugin}-sfdc_fields>> |<<array,array>>|No
Expand Down Expand Up @@ -124,8 +125,25 @@ https://help.salesforce.com/apex/HTViewHelpDoc?id=connected_app_create.htm

Consumer Secret from your oauth enabled connected app

[id="plugins-{type}s-{plugin}-interval"]
===== `interval`

* Value type is <<number,number>>
* There is no default value for this setting.

The interval in seconds between each run of the plugin.

If specified, the plugin only terminates when it receives the stop
signal from Logstash, e.g. when you press Ctrl-C when running interactively,
or when the process receives a TERM signal. It will query and publish
events for all results, then sleep until `interval` seconds from the start
of the previous run of the plugin have passed. If the plugin ran for longer
than `interval` seconds, it will run again immediately.

If this property is not specified or is set to -1, the plugin will run once and then exit.

[id="plugins-{type}s-{plugin}-password"]
===== `password`
===== `password`

* This is a required setting.
* Value type is <<password,password>>
Expand Down
75 changes: 55 additions & 20 deletions lib/logstash/inputs/salesforce.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
require "logstash/inputs/base"
require "logstash/namespace"
require "time"
require "stud/interval"

# This Logstash input plugin allows you to query Salesforce using SOQL and puts the results
# into Logstash, one row per event. You can configure it to pull entire sObjects or only
Expand Down Expand Up @@ -55,46 +56,62 @@ class LogStash::Inputs::Salesforce < LogStash::Inputs::Base
# See https://developer.salesforce.com/docs/atlas.en-us.api_tooling.meta/api_tooling
# for more details about the Tooling API
config :use_tooling_api, :validate => :boolean, :default => false

# Set this to true to connect to a sandbox sfdc instance
# logging in through test.salesforce.com
config :use_test_sandbox, :validate => :boolean, :default => false

# Set this to the instance url of the sfdc instance you want
# to connect to already during login. If you have configured
# a MyDomain in your sfdc instance you would provide
# <mydomain>.my.salesforce.com here.
config :sfdc_instance_url, :validate => :string, :required => false

# By default, this uses the default Restforce API version.
# To override this, set this to something like "32.0" for example
config :api_version, :validate => :string, :required => false

# Consumer Key for authentication. You must set up a new SFDC
# connected app with oath to use this output. More information
# can be found here:
# https://help.salesforce.com/apex/HTViewHelpDoc?id=connected_app_create.htm
config :client_id, :validate => :string, :required => true

# Consumer Secret from your oauth enabled connected app
config :client_secret, :validate => :password, :required => true

# A valid salesforce user name, usually your email address.
# Used for authentication and will be the user all objects
# are created or modified by
config :username, :validate => :string, :required => true

# The password used to login to sfdc
config :password, :validate => :password, :required => true

# The security token for this account. For more information about
# generting a security token, see:
# https://help.salesforce.com/apex/HTViewHelpDoc?id=user_security_token.htm
config :security_token, :validate => :password, :required => true

# The name of the salesforce object you are creating or updating
config :sfdc_object_name, :validate => :string, :required => true

# These are the field names to return in the Salesforce query
# If this is empty, all fields are returned.
config :sfdc_fields, :validate => :array, :default => []

# These options will be added to the WHERE clause in the
# SOQL statement. Additional fields can be filtered on by
# adding field1 = value1 AND field2 = value2 AND...
config :sfdc_filters, :validate => :string, :default => ""

# Setting this to true will convert SFDC's NamedFields__c to named_fields__c
config :to_underscores, :validate => :boolean, :default => false

# Interval to run the command. Value is in seconds. If no interval is given,
# this plugin only fetches data once.
config :interval, :validate => :number, :required => false, :default => -1

public
def register
require 'restforce'
Expand All @@ -105,26 +122,44 @@ def register

public
def run(queue)
results = client.query(get_query())
if results && results.first
results.each do |result|
event = LogStash::Event.new()
decorate(event)
@sfdc_fields.each do |field|
field_type = @sfdc_field_types[field]
value = result.send(field)
event_key = @to_underscores ? underscore(field) : field
if not value.nil?
case field_type
when 'datetime', 'date'
event.set(event_key, format_time(value))
else
event.set(event_key, value)
while !stop?
start = Time.now
results = client.query(get_query())
if results && results.first
results.each do |result|
event = LogStash::Event.new()
decorate(event)
@sfdc_fields.each do |field|
field_type = @sfdc_field_types[field]
value = result.send(field)
event_key = @to_underscores ? underscore(field) : field
if not value.nil?
case field_type
when 'datetime', 'date'
event.set(event_key, format_time(value))
else
event.set(event_key, value)
end
end
end
queue << event
end
end # loop sObjects

if @interval == -1
break
else
duration = Time.now - start
# Sleep for the remainder of the interval, or 0 if the duration ran
# longer than the interval.
sleeptime = [0, @interval - duration].max
if sleeptime == 0
@logger.warn("Execution ran longer than the interval. Skipping sleep.",
:duration => duration,
:interval => @interval)
end
queue << event
end
Stud.stoppable_sleep(sleeptime) { stop? }
end # end interval check
end
end # def run

Expand Down Expand Up @@ -160,9 +195,9 @@ def client_options

private
def get_query()
query = ["SELECT",@sfdc_fields.join(','),
"FROM",@sfdc_object_name]
query << ["WHERE",@sfdc_filters] unless @sfdc_filters.empty?
query = ["SELECT", @sfdc_fields.join(','),
"FROM", @sfdc_object_name]
query << ["WHERE", @sfdc_filters] unless @sfdc_filters.empty?
query << "ORDER BY LastModifiedDate DESC" if @sfdc_fields.include?('LastModifiedDate')
query_str = query.flatten.join(" ")
@logger.debug? && @logger.debug("SFDC Query", :query => query_str)
Expand Down
2 changes: 1 addition & 1 deletion logstash-input-salesforce.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-input-salesforce'
s.version = '3.2.1'
s.version = '3.3.0'
s.licenses = ['Apache License (2.0)']
s.summary = "Creates events based on a Salesforce SOQL query"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
161 changes: 161 additions & 0 deletions spec/fixtures/vcr_cassettes/load_some_lead_objects_twice.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading