Skip to content

Added array of sObjects and run at interval support #12

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 76 additions & 27 deletions lib/logstash/inputs/salesforce.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
# username => 'email@example.com'
# password => 'super-secret'
# security_token => 'SECURITY TOKEN FOR THIS USER'
# sfdc_object_name => 'Opportunity'
# sfdc_object_names => ['Account','User']
# }
# }
#
Expand All @@ -52,68 +52,117 @@ class LogStash::Inputs::Salesforce < LogStash::Inputs::Base
# Set this to true to connect to a sandbox sfdc instance
# logging in through test.salesforce.com
config :use_test_sandbox, :validate => :boolean, :default => false

# By default, this uses the default Restforce API version.
# To override this, set this to something like "32.0" for example
config :api_version, :validate => :string, :required => false

# Consumer Key for authentication. You must set up a new SFDC
# connected app with oath to use this output. More information
# can be found here:
# https://help.salesforce.com/apex/HTViewHelpDoc?id=connected_app_create.htm
config :client_id, :validate => :string, :required => true

# Consumer Secret from your oauth enabled connected app
config :client_secret, :validate => :string, :required => true

# A valid salesforce user name, usually your email address.
# Used for authentication and will be the user all objects
# are created or modified by
config :username, :validate => :string, :required => true

# The password used to login to sfdc
config :password, :validate => :string, :required => true

# The security token for this account. For more information about
# generting a security token, see:
# https://help.salesforce.com/apex/HTViewHelpDoc?id=user_security_token.htm
config :security_token, :validate => :string, :required => true
# The name of the salesforce object you are creating or updating
config :sfdc_object_name, :validate => :string, :required => true

# A list of the salesforce objects you are pulling. If you are specifying
# more than one, you should probably be very careful about using the sfdc_fields
# and sfdc_filters configuration options.
config :sfdc_object_names, :validate => :array, :required => true

# These are the field names to return in the Salesforce query
# If this is empty, all fields are returned.
# NOTE: If specifying multiple objects to pull, these fields must
# be valid for ALL objects being pulled. Using this with multiple
# sObjects is probably not a good idea.
config :sfdc_fields, :validate => :array, :default => []

# These options will be added to the WHERE clause in the
# SOQL statement. Additional fields can be filtered on by
# adding field1 = value1 AND field2 = value2 AND...
# NOTE: If specifying multiple objects to pull, these filters must
# be valid for ALL objects being pulled.
config :sfdc_filters, :validate => :string, :default => ""

# Setting this to true will convert SFDC's NamedFields__c to named_fields__c
config :to_underscores, :validate => :boolean, :default => false

# This will add a field to the event letting you know what sObject the event is.
# This is useful for filtering when specifying multiple objects names to query.
config :sfdc_object_type_field, :validate => :string, :required => false

# Interval to run the command. Value is in seconds. If no interval is given,
# this plugin only fetches data once.
config :interval, :validate => :number, :required => false, :default => -1


public
def register
require 'restforce'
obj_desc = client.describe(@sfdc_object_name)
@sfdc_field_types = get_field_types(obj_desc)
@sfdc_fields = get_all_fields if @sfdc_fields.empty?
end # def register

public
def run(queue)
results = client.query(get_query())
if results && results.first
results.each do |result|
event = LogStash::Event.new()
decorate(event)
@sfdc_fields.each do |field|
field_type = @sfdc_field_types[field]
value = result.send(field)
event_key = @to_underscores ? underscore(field) : field
if not value.nil?
case field_type
when 'datetime', 'date'
event.set(event_key, format_time(value))
else
event.set(event_key, value)
while !stop?
start = Time.now
@sfdc_object_names.each do |sfdc_object_name|
obj_desc = client.describe(sfdc_object_name)
@sfdc_field_types = get_field_types(obj_desc)
current_sfdc_fields = (@sfdc_fields.empty?) ? get_all_fields : @sfdc_fields

results = client.query(get_query(current_sfdc_fields,sfdc_object_name))
if results && results.first
results.each do |result|
event = LogStash::Event.new()
decorate(event)
current_sfdc_fields.each do |field|
field_type = @sfdc_field_types[field]
value = result.send(field)
event_key = @to_underscores ? underscore(field) : field
if not value.nil?
case field_type
when 'datetime', 'date'
event.set(event_key, format_time(value))
else
event.set(event_key, value)
end
end
end
event.set(@sfdc_object_type_field, sfdc_object_name) if @sfdc_object_type_field
queue << event
end
end
queue << event
end
end # loop sObjects
if @interval == -1
break
else
duration = Time.now - start
# Sleep for the remainder of the interval, or 0 if the duration ran
# longer than the interval.
sleeptime = [0, @interval - duration].max
if sleeptime == 0
@logger.warn("Execution ran longer than the interval. Skipping sleep.",
:duration => duration,
:interval => @interval)
else
sleep(sleeptime)
end
end # end interval check
Stud.stoppable_sleep(@interval) { stop? }
end
end # def run

Expand All @@ -137,11 +186,11 @@ def client_options
end

private
def get_query()
query = ["SELECT",@sfdc_fields.join(','),
"FROM",@sfdc_object_name]
def get_query(sfdc_fields,sfdc_object_name)
query = ["SELECT",sfdc_fields.join(','),
"FROM",sfdc_object_name]
query << ["WHERE",@sfdc_filters] unless @sfdc_filters.empty?
query << "ORDER BY LastModifiedDate DESC" if @sfdc_fields.include?('LastModifiedDate')
query << "ORDER BY LastModifiedDate DESC" if sfdc_fields.include?('LastModifiedDate')
query_str = query.flatten.join(" ")
@logger.debug? && @logger.debug("SFDC Query", :query => query_str)
return query_str
Expand Down