Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cpu usage is high in 3.4.2 but not high in 3.1.1 #584

Closed
wyukawa opened this issue May 9, 2019 · 3 comments
Closed

cpu usage is high in 3.4.2 but not high in 3.1.1 #584

wyukawa opened this issue May 9, 2019 · 3 comments

Comments

@wyukawa
Copy link
Contributor

wyukawa commented May 9, 2019

Problem

The CPU usage is high, about 75%-100% in 3.4.2 but not high in 3.1.1

Steps to replicate

data flow is here
kafka -> kafka-fluentd-consumer -> fluend v1 -> fluent-plugin-elasticsearch -> elasticsearch

fluentd conf

<match {...}.**>
  @type elasticsearch
  @id ...

  hosts elasticsearchhost...

  <buffer tag>
    @type file
    flush_at_shutdown true
    chunk_limit_size 8m
    queue_limit_length 512
    flush_interval 1s
    flush_thread_count 10
    overflow_action drop_oldest_chunk
  </buffer>

  logstash_format true
  logstash_prefix ${tag}
  utc_index false
  include_tag_key true
  tag_key tag_key
  request_timeout 30s
  resurrect_after 0 # defaults to 60s
  reload_on_failure true
  reload_connections false
  http_backend typhoeus
  type_name _doc
</match>

stackprof

$ stackprof /tmp/fluent-stackprof.dump4 --text
==================================
  Mode: cpu(1000)
  Samples: 18822 (26.48% miss rate)
  GC: 864 (4.59%)
==================================
     TOTAL    (pct)     SAMPLES    (pct)     FRAME
     16131  (85.7%)       15358  (81.6%)     Fluent::Plugin::ElasticsearchOutput#write
       914   (4.9%)         914   (4.9%)     Fluent::Plugin::RecordTransformerFilter::RubyPlaceholderExpander::CleanroomExpander#expand
       864   (4.6%)         864   (4.6%)     (garbage collection)
       410   (2.2%)         410   (2.2%)     Fluent::Plugin::RecordTransformerFilter::RubyPlaceholderExpander#time_value
       359   (1.9%)         359   (1.9%)     Fluent::Plugin::ElasticsearchOutput#append_record_to_messages
       339   (1.8%)         339   (1.8%)     Fluent::Plugin::ElasticsearchOutput#process_message
       161   (0.9%)         161   (0.9%)     Fluent::EventStream#to_msgpack_stream
      1043   (5.5%)         110   (0.6%)     Fluent::Plugin::RecordTransformerFilter#reform
        29   (0.2%)          29   (0.2%)     BasicSocket#read_nonblock
      1655   (8.8%)          16   (0.1%)     Fluent::Plugin::ForwardInput#read_messages
      1768   (9.4%)          15   (0.1%)     Coolio::Loop#run
     16161  (85.9%)          14   (0.1%)     Fluent::Plugin::Output#flush_thread_run
      5079  (27.0%)          14   (0.1%)     Fluent::ChunkMessagePackEventStreamer#each
       927   (4.9%)          13   (0.1%)     Fluent::Plugin::RecordTransformerFilter#expand_placeholders
        21   (0.1%)          10   (0.1%)     Fluent::PluginHelper::Socket#socket_create_tcp
        14   (0.1%)          10   (0.1%)     Fluent::Plugin::Output#next_flush_time
        16   (0.1%)          10   (0.1%)     Ethon::Curls::Options#set_option
      1693   (9.0%)           9   (0.0%)     Coolio::IO#on_readable
         7   (0.0%)           7   (0.0%)     #<Fluent::PluginHelper::RecordAccessor::Accessor:0x000056391b7cb488>.delete_top
         6   (0.0%)           6   (0.0%)     Fluent::Plugin::RecordTransformerFilter::RubyPlaceholderExpander#prepare_placeholders
         6   (0.0%)           6   (0.0%)     Fluent::EngineClass#run
         6   (0.0%)           6   (0.0%)     Fluent::MultiEventStream#add
      2000  (10.6%)           5   (0.0%)     Fluent::ChunkMessagePackEventStreamer#each
         5   (0.0%)           5   (0.0%)     FFI::Enum#from_native
        13   (0.1%)           5   (0.0%)     Ethon::Easy::Callbacks#body_write_callback
         5   (0.0%)           5   (0.0%)     MultiJson::Adapters::Oj#load
      1981  (10.5%)           5   (0.0%)     Fluent::ChunkMessagePackEventStreamer#each
         4   (0.0%)           4   (0.0%)     Fluent::Plugin::Buffer::FileChunk#concat
         4   (0.0%)           4   (0.0%)     MonitorMixin#mon_enter
         4   (0.0%)           4   (0.0%)     Fluent::Plugin::ForwardOutput::FailureDetector#add
$ stackprof /tmp/fluent-stackprof.dump4 --method 'ElasticsearchOutput#write'
Fluent::Plugin::ElasticsearchOutput#write (/.../vendor/bundle/ruby/2.6.0/gems/fluent-plugin-elasticsearch-3.4.2/lib/fluent/plugin/out_elasticsearch.rb:567)
  samples:  15358 self (81.6%)  /   16131 total (85.7%)
  callers:
    16131  (  100.0%)  Fluent::Plugin::Output#try_flush
    5070  (   31.4%)  Fluent::ChunkMessagePackEventStreamer#each
    5065  (   31.4%)  Fluent::ChunkMessagePackEventStreamer#each
    1995  (   12.4%)  Fluent::ChunkMessagePackEventStreamer#each
    1976  (   12.2%)  Fluent::ChunkMessagePackEventStreamer#each
    1948  (   12.1%)  Fluent::ChunkMessagePackEventStreamer#each
      49  (    0.3%)  Fluent::Plugin::ElasticsearchOutput#write
  callees (773 total):
    5079  (  657.1%)  Fluent::ChunkMessagePackEventStreamer#each
    5072  (  656.1%)  Fluent::ChunkMessagePackEventStreamer#each
    2000  (  258.7%)  Fluent::ChunkMessagePackEventStreamer#each
    1981  (  256.3%)  Fluent::ChunkMessagePackEventStreamer#each
    1949  (  252.1%)  Fluent::ChunkMessagePackEventStreamer#each
     359  (   46.4%)  Fluent::Plugin::ElasticsearchOutput#append_record_to_messages
     339  (   43.9%)  Fluent::Plugin::ElasticsearchOutput#process_message
      49  (    6.3%)  Fluent::Plugin::ElasticsearchOutput#write
      47  (    6.1%)  Fluent::Plugin::ElasticsearchOutput#send_bulk
       1  (    0.1%)  Fluent::Plugin::ElasticsearchOutput#expand_placeholders
  code:
                                  |   567  |     def write(chunk)
                                  |   568  |       bulk_message_count = Hash.new { |h,k| h[k] = 0 }
                                  |   569  |       bulk_message = Hash.new { |h,k| h[k] = '' }
                                  |   570  |       header = {}
                                  |   571  |       meta = {}
                                  |   572  |
                                  |   573  |       tag = chunk.metadata.tag
    1    (0.0%)                   |   574  |       extracted_values = expand_placeholders(chunk)
                                  |   575  |       host = if @hosts
                                  |   576  |                extract_placeholders(@hosts, chunk)
                                  |   577  |              else
                                  |   578  |                extract_placeholders(@host, chunk)
                                  |   579  |              end
                                  |   580  |
 16081   (85.4%)                   |   581  |       chunk.msgpack_each do |time, record|
  435    (2.3%) /   435   (2.3%)  |   582  |         next unless record.is_a? Hash
                                  |   583  |         begin
  339    (1.8%)                   |   584  |           meta, header, record = process_message(tag, meta, header, time, record, extracted_values)
    6    (0.0%) /     6   (0.0%)  |   585  |           info = if @include_index_in_url
                                  |   586  |                    RequestInfo.new(host, meta.delete("_index".freeze))
                                  |   587  |                  else
   10    (0.1%) /    10   (0.1%)  |   588  |                    RequestInfo.new(host, nil)
                                  |   589  |                  end
                                  |   590  |
  419    (2.2%) /    60   (0.3%)  |   591  |           if append_record_to_messages(@write_operation, meta, header, record, bulk_message[info])
   69    (0.4%) /    69   (0.4%)  |   592  |             bulk_message_count[info] += 1;
 14772   (78.5%) /  14772  (78.5%)  |   593  |             if bulk_message[info].size > TARGET_BULK_BYTES
                                  |   594  |               bulk_message.each do |info, msgs|
                                  |   595  |                 send_bulk(msgs, tag, chunk, bulk_message_count[info], extracted_values, info) unless msgs.empty?
                                  |   596  |                 msgs.clear
                                  |   597  |                 # Clear bulk_message_count for this info.
                                  |   598  |                 bulk_message_count[info] = 0;
                                  |   599  |                 next
                                  |   600  |               end
                                  |   601  |             end
                                  |   602  |           else
                                  |   603  |             if @emit_error_for_missing_id
                                  |   604  |               raise MissingIdFieldError, "Missing '_id' field. Write operation is #{@write_operation}"
                                  |   605  |             else
                                  |   606  |               log.on_debug { log.debug("Dropping record because its missing an '_id' field and write_operation is #{@write_operation}: #{record}") }
                                  |   607  |             end
                                  |   608  |           end
                                  |   609  |         rescue => e
                                  |   610  |           router.emit_error_event(tag, time, record, e)
                                  |   611  |         end
    4    (0.0%) /     4   (0.0%)  |   612  |       end
                                  |   613  |
   49    (0.3%)                   |   614  |       bulk_message.each do |info, msgs|
   47    (0.2%)                   |   615  |         send_bulk(msgs, tag, chunk, bulk_message_count[info], extracted_values, info) unless msgs.empty?
                                  |   616  |         msgs.clear
    2    (0.0%) /     2   (0.0%)  |   617  |       end
                                  |   618  |     end

Expected Behavior or What you need to ask

if we use 3.1.1(in other words, without #539)
cpu usage is not high, about 25%

I suspect size because it's similar to fluent/fluentd#1801 and komamitsu/fluency#85

Any opinion?

Using Fluentd and ES plugin versions

  • OS version
    CentOS Linux release 7.6.1810 (Core)
  • Bare Metal or within Docker or Kubernetes or others?
    Bare Metal
  • Fluentd v0.12 or v0.14/v1.0
    1.4.2
  • ES plugin 3.x.y/2.x.y or 1.x.y
    3.4.2
2019-05-09 14:57:19 +0900 [info]: starting fluentd-1.4.2 pid=112429 ruby="2.6.3"
...
2019-05-09 14:57:20 +0900 [info]: gem 'fluentd' version '1.4.2'
2019-05-09 14:57:20 +0900 [info]: gem 'fluent-plugin-elasticsearch' version '3.4.2'
2019-05-09 14:57:20 +0900 [info]: gem 'fluent-plugin-ping-message' version '1.0.0'
2019-05-09 14:57:20 +0900 [info]: gem 'fluent-plugin-prometheus' version '1.3.0'
2019-05-09 14:57:20 +0900 [info]: gem 'fluent-plugin-record-reformer' version '0.9.1'
2019-05-09 14:57:20 +0900 [info]: gem 'fluent-plugin-suppress' version '1.1.0'

Gemfile

source 'https://rubygems.org'

gem "fluentd", "1.4.2"
gem "oj", "3.7.12"

gem "fluent-plugin-ping-message", "1.0.0"
gem "fluent-plugin-record-reformer", "0.9.1"
gem "fluent-plugin-suppress", "1.1.0"
gem "fluent-plugin-elasticsearch", "3.4.2"
gem "typhoeus", "1.3.1"
gem "fluent-plugin-prometheus", "1.3.0"
  • ES version (optional)
    7.0.1
  • ES template(s) (optional)
@cosmo0920
Copy link
Collaborator

I suspect size because it's similar to fluent/fluentd#1801 and komamitsu/fluency#85

Any opinion?

I think that the following line causes high cpu usage:

 14772   (78.5%) /  14772  (78.5%)  |   593  |             if bulk_message[info].size > TARGET_BULK_BYTES

But this line is introduced to resolve batch size exceeded issue.

ref: #535

I'm considering how to reduce cpu usage in this line.

@cosmo0920
Copy link
Collaborator

I think that #586 patch can reduce CPU usage with the following configuration:

bulk_message_request_threshold -1

Could you check it?

@wyukawa
Copy link
Contributor Author

wyukawa commented May 13, 2019

Gemfile

source 'https://rubygems.org'

gem "fluentd", "1.4.2"
gem "oj", "3.7.12"

gem "fluent-plugin-ping-message", "1.0.0"
gem "fluent-plugin-record-reformer", "0.9.1"
gem "fluent-plugin-suppress", "1.1.0"
gem 'fluent-plugin-elasticsearch', :git => 'https://github.com/uken/fluent-plugin-elasticsearch.git', :branch => "configurable-split_request_p"
gem "typhoeus", "1.3.1"
gem "fluent-plugin-prometheus", "1.3.0"

fluentd conf

  <match {...}.**>
    @type elasticsearch
    @id ...
    hosts ...
    logstash_format true
    logstash_prefix "${tag}"
    utc_index false
    include_tag_key true
    tag_key "tag_key"
    request_timeout 30s
    resurrect_after 0
    reload_on_failure true
    reload_connections false
    http_backend typhoeus
    type_name "_doc"
    bulk_message_request_threshold -1
    <buffer tag>
      @type "file"
      flush_at_shutdown true
      chunk_limit_size 8m
      queue_limit_length 512
      flush_interval 1s
      flush_thread_count 10
      overflow_action drop_oldest_chunk
    </buffer>
  </match>

It worked well
Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants