2
2
require "logstash/inputs/base"
3
3
require "logstash/namespace"
4
4
require "time"
5
+ require "stud/interval"
5
6
6
7
# This Logstash input plugin allows you to query Salesforce using SOQL and puts the results
7
8
# into Logstash, one row per event. You can configure it to pull entire sObjects or only
@@ -55,46 +56,62 @@ class LogStash::Inputs::Salesforce < LogStash::Inputs::Base
55
56
# See https://developer.salesforce.com/docs/atlas.en-us.api_tooling.meta/api_tooling
56
57
# for more details about the Tooling API
57
58
config :use_tooling_api , :validate => :boolean , :default => false
59
+
58
60
# Set this to true to connect to a sandbox sfdc instance
59
61
# logging in through test.salesforce.com
60
62
config :use_test_sandbox , :validate => :boolean , :default => false
63
+
61
64
# Set this to the instance url of the sfdc instance you want
62
65
# to connect to already during login. If you have configured
63
66
# a MyDomain in your sfdc instance you would provide
64
67
# <mydomain>.my.salesforce.com here.
65
68
config :sfdc_instance_url , :validate => :string , :required => false
69
+
66
70
# By default, this uses the default Restforce API version.
67
71
# To override this, set this to something like "32.0" for example
68
72
config :api_version , :validate => :string , :required => false
73
+
69
74
# Consumer Key for authentication. You must set up a new SFDC
70
75
# connected app with oath to use this output. More information
71
76
# can be found here:
72
77
# https://help.salesforce.com/apex/HTViewHelpDoc?id=connected_app_create.htm
73
78
config :client_id , :validate => :string , :required => true
79
+
74
80
# Consumer Secret from your oauth enabled connected app
75
81
config :client_secret , :validate => :password , :required => true
82
+
76
83
# A valid salesforce user name, usually your email address.
77
84
# Used for authentication and will be the user all objects
78
85
# are created or modified by
79
86
config :username , :validate => :string , :required => true
87
+
80
88
# The password used to login to sfdc
81
89
config :password , :validate => :password , :required => true
90
+
82
91
# The security token for this account. For more information about
83
92
# generting a security token, see:
84
93
# https://help.salesforce.com/apex/HTViewHelpDoc?id=user_security_token.htm
85
94
config :security_token , :validate => :password , :required => true
95
+
86
96
# The name of the salesforce object you are creating or updating
87
97
config :sfdc_object_name , :validate => :string , :required => true
98
+
88
99
# These are the field names to return in the Salesforce query
89
100
# If this is empty, all fields are returned.
90
101
config :sfdc_fields , :validate => :array , :default => [ ]
102
+
91
103
# These options will be added to the WHERE clause in the
92
104
# SOQL statement. Additional fields can be filtered on by
93
105
# adding field1 = value1 AND field2 = value2 AND...
94
106
config :sfdc_filters , :validate => :string , :default => ""
107
+
95
108
# Setting this to true will convert SFDC's NamedFields__c to named_fields__c
96
109
config :to_underscores , :validate => :boolean , :default => false
97
110
111
+ # Interval to run the command. Value is in seconds. If no interval is given,
112
+ # this plugin only fetches data once.
113
+ config :interval , :validate => :number , :required => false , :default => -1
114
+
98
115
public
99
116
def register
100
117
require 'restforce'
@@ -105,26 +122,44 @@ def register
105
122
106
123
public
107
124
def run ( queue )
108
- results = client . query ( get_query ( ) )
109
- if results && results . first
110
- results . each do |result |
111
- event = LogStash ::Event . new ( )
112
- decorate ( event )
113
- @sfdc_fields . each do |field |
114
- field_type = @sfdc_field_types [ field ]
115
- value = result . send ( field )
116
- event_key = @to_underscores ? underscore ( field ) : field
117
- if not value . nil?
118
- case field_type
119
- when 'datetime' , 'date'
120
- event . set ( event_key , format_time ( value ) )
121
- else
122
- event . set ( event_key , value )
125
+ while !stop?
126
+ start = Time . now
127
+ results = client . query ( get_query ( ) )
128
+ if results && results . first
129
+ results . each do |result |
130
+ event = LogStash ::Event . new ( )
131
+ decorate ( event )
132
+ @sfdc_fields . each do |field |
133
+ field_type = @sfdc_field_types [ field ]
134
+ value = result . send ( field )
135
+ event_key = @to_underscores ? underscore ( field ) : field
136
+ if not value . nil?
137
+ case field_type
138
+ when 'datetime' , 'date'
139
+ event . set ( event_key , format_time ( value ) )
140
+ else
141
+ event . set ( event_key , value )
142
+ end
123
143
end
124
144
end
145
+ queue << event
146
+ end
147
+ end # loop sObjects
148
+
149
+ if @interval == -1
150
+ break
151
+ else
152
+ duration = Time . now - start
153
+ # Sleep for the remainder of the interval, or 0 if the duration ran
154
+ # longer than the interval.
155
+ sleeptime = [ 0 , @interval - duration ] . max
156
+ if sleeptime == 0
157
+ @logger . warn ( "Execution ran longer than the interval. Skipping sleep." ,
158
+ :duration => duration ,
159
+ :interval => @interval )
125
160
end
126
- queue << event
127
- end
161
+ Stud . stoppable_sleep ( sleeptime ) { stop? }
162
+ end # end interval check
128
163
end
129
164
end # def run
130
165
@@ -160,9 +195,9 @@ def client_options
160
195
161
196
private
162
197
def get_query ( )
163
- query = [ "SELECT" , @sfdc_fields . join ( ',' ) ,
164
- "FROM" , @sfdc_object_name ]
165
- query << [ "WHERE" , @sfdc_filters ] unless @sfdc_filters . empty?
198
+ query = [ "SELECT" , @sfdc_fields . join ( ',' ) ,
199
+ "FROM" , @sfdc_object_name ]
200
+ query << [ "WHERE" , @sfdc_filters ] unless @sfdc_filters . empty?
166
201
query << "ORDER BY LastModifiedDate DESC" if @sfdc_fields . include? ( 'LastModifiedDate' )
167
202
query_str = query . flatten . join ( " " )
168
203
@logger . debug? && @logger . debug ( "SFDC Query" , :query => query_str )
0 commit comments