Skip to content

Commit 25a88ed

Browse files
fransflippofransf-wtax
authored andcommitted
feat: added interval config option to tell the plugin to run periodically
1 parent 2949ebc commit 25a88ed

File tree

4 files changed

+267
-25
lines changed

4 files changed

+267
-25
lines changed

lib/logstash/inputs/salesforce.rb

Lines changed: 55 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
require "logstash/inputs/base"
33
require "logstash/namespace"
44
require "time"
5+
require "stud/interval"
56

67
# This Logstash input plugin allows you to query Salesforce using SOQL and puts the results
78
# into Logstash, one row per event. You can configure it to pull entire sObjects or only
@@ -55,46 +56,62 @@ class LogStash::Inputs::Salesforce < LogStash::Inputs::Base
5556
# See https://developer.salesforce.com/docs/atlas.en-us.api_tooling.meta/api_tooling
5657
# for more details about the Tooling API
5758
config :use_tooling_api, :validate => :boolean, :default => false
59+
5860
# Set this to true to connect to a sandbox sfdc instance
5961
# logging in through test.salesforce.com
6062
config :use_test_sandbox, :validate => :boolean, :default => false
63+
6164
# Set this to the instance url of the sfdc instance you want
6265
# to connect to already during login. If you have configured
6366
# a MyDomain in your sfdc instance you would provide
6467
# <mydomain>.my.salesforce.com here.
6568
config :sfdc_instance_url, :validate => :string, :required => false
69+
6670
# By default, this uses the default Restforce API version.
6771
# To override this, set this to something like "32.0" for example
6872
config :api_version, :validate => :string, :required => false
73+
6974
# Consumer Key for authentication. You must set up a new SFDC
7075
# connected app with oath to use this output. More information
7176
# can be found here:
7277
# https://help.salesforce.com/apex/HTViewHelpDoc?id=connected_app_create.htm
7378
config :client_id, :validate => :string, :required => true
79+
7480
# Consumer Secret from your oauth enabled connected app
7581
config :client_secret, :validate => :password, :required => true
82+
7683
# A valid salesforce user name, usually your email address.
7784
# Used for authentication and will be the user all objects
7885
# are created or modified by
7986
config :username, :validate => :string, :required => true
87+
8088
# The password used to login to sfdc
8189
config :password, :validate => :password, :required => true
90+
8291
# The security token for this account. For more information about
8392
# generting a security token, see:
8493
# https://help.salesforce.com/apex/HTViewHelpDoc?id=user_security_token.htm
8594
config :security_token, :validate => :password, :required => true
95+
8696
# The name of the salesforce object you are creating or updating
8797
config :sfdc_object_name, :validate => :string, :required => true
98+
8899
# These are the field names to return in the Salesforce query
89100
# If this is empty, all fields are returned.
90101
config :sfdc_fields, :validate => :array, :default => []
102+
91103
# These options will be added to the WHERE clause in the
92104
# SOQL statement. Additional fields can be filtered on by
93105
# adding field1 = value1 AND field2 = value2 AND...
94106
config :sfdc_filters, :validate => :string, :default => ""
107+
95108
# Setting this to true will convert SFDC's NamedFields__c to named_fields__c
96109
config :to_underscores, :validate => :boolean, :default => false
97110

111+
# Interval to run the command. Value is in seconds. If no interval is given,
112+
# this plugin only fetches data once.
113+
config :interval, :validate => :number, :required => false, :default => -1
114+
98115
public
99116
def register
100117
require 'restforce'
@@ -105,26 +122,44 @@ def register
105122

106123
public
107124
def run(queue)
108-
results = client.query(get_query())
109-
if results && results.first
110-
results.each do |result|
111-
event = LogStash::Event.new()
112-
decorate(event)
113-
@sfdc_fields.each do |field|
114-
field_type = @sfdc_field_types[field]
115-
value = result.send(field)
116-
event_key = @to_underscores ? underscore(field) : field
117-
if not value.nil?
118-
case field_type
119-
when 'datetime', 'date'
120-
event.set(event_key, format_time(value))
121-
else
122-
event.set(event_key, value)
125+
while !stop?
126+
start = Time.now
127+
results = client.query(get_query())
128+
if results && results.first
129+
results.each do |result|
130+
event = LogStash::Event.new()
131+
decorate(event)
132+
@sfdc_fields.each do |field|
133+
field_type = @sfdc_field_types[field]
134+
value = result.send(field)
135+
event_key = @to_underscores ? underscore(field) : field
136+
if not value.nil?
137+
case field_type
138+
when 'datetime', 'date'
139+
event.set(event_key, format_time(value))
140+
else
141+
event.set(event_key, value)
142+
end
123143
end
124144
end
145+
queue << event
146+
end
147+
end # loop sObjects
148+
149+
if @interval == -1
150+
break
151+
else
152+
duration = Time.now - start
153+
# Sleep for the remainder of the interval, or 0 if the duration ran
154+
# longer than the interval.
155+
sleeptime = [0, @interval - duration].max
156+
if sleeptime == 0
157+
@logger.warn("Execution ran longer than the interval. Skipping sleep.",
158+
:duration => duration,
159+
:interval => @interval)
125160
end
126-
queue << event
127-
end
161+
Stud.stoppable_sleep(sleeptime) { stop? }
162+
end # end interval check
128163
end
129164
end # def run
130165

@@ -160,9 +195,9 @@ def client_options
160195

161196
private
162197
def get_query()
163-
query = ["SELECT",@sfdc_fields.join(','),
164-
"FROM",@sfdc_object_name]
165-
query << ["WHERE",@sfdc_filters] unless @sfdc_filters.empty?
198+
query = ["SELECT", @sfdc_fields.join(','),
199+
"FROM", @sfdc_object_name]
200+
query << ["WHERE", @sfdc_filters] unless @sfdc_filters.empty?
166201
query << "ORDER BY LastModifiedDate DESC" if @sfdc_fields.include?('LastModifiedDate')
167202
query_str = query.flatten.join(" ")
168203
@logger.debug? && @logger.debug("SFDC Query", :query => query_str)

logstash-input-salesforce.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |s|
22
s.name = 'logstash-input-salesforce'
3-
s.version = '3.2.1'
3+
s.version = '3.3.0'
44
s.licenses = ['Apache License (2.0)']
55
s.summary = "Creates events based on a Salesforce SOQL query"
66
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

spec/fixtures/vcr_cassettes/load_some_lead_objects_twice.yml

Lines changed: 161 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)