Data Besar / Bug: Menganalisis Kode Sumber Apache Flink

gambar1.png


Aplikasi Big Data memproses informasi dalam jumlah besar, seringkali dalam waktu nyata. Biasanya, aplikasi semacam itu harus sangat andal sehingga tidak ada kesalahan dalam kode yang dapat mengganggu pemrosesan data. Untuk mencapai keandalan yang tinggi, kualitas kode proyek yang dikembangkan untuk bidang ini perlu dipantau secara ketat. Penganalisis statis PVS-Studio menangani masalah ini. Saat ini, proyek Apache Flink yang dikembangkan oleh Apache Software Foundation, salah satu pemimpin di pasar perangkat lunak Big Data, dipilih sebagai subjek uji untuk penganalisis.



Apa itu Apache Flink? Ini adalah kerangka kerja sumber terbuka untuk pemrosesan terdistribusi dari sejumlah besar data. Ini dikembangkan sebagai alternatif untuk Hadoop MapReduce pada tahun 2010 di Universitas Teknik Berlin. Framework ini didasarkan pada mesin eksekusi terdistribusi untuk aplikasi pemrosesan data batch dan stream. Mesin ini ditulis dalam bahasa Java dan Scala. Saat ini Apache Flink dapat digunakan dalam proyek yang ditulis menggunakan Java, Scala, Python, dan bahkan SQL.



Analisis proyek



Setelah mengunduh kode sumber proyek, saya mulai membangun proyek dengan perintah 'mvn clean package -DskipTests' yang ditentukan dalam instruksi di GitHub . Saat perakitan sedang berlangsung, menggunakan utilitas CLOC , saya menemukan bahwa ada 10838 file Java dalam proyek tersebut, yang memiliki sekitar 1,3 juta baris kode. Selain itu, terdapat 3833 file uji Java, yang merupakan lebih dari 1/3 dari semua file Java. Saya juga memperhatikan bahwa proyek tersebut menggunakan penganalisis kode statis FindBugs dan utilitas Cobertura, yang memberikan informasi tentang cakupan kode melalui pengujian. Dengan semua pemikiran ini, menjadi jelas bahwa pengembang Apache Flink secara hati-hati memantau kualitas kode dan cakupan pengujian selama pengembangan.



Setelah berhasil membangun, saya membuka proyek di IntelliJ IDEA dan menjalankan analisis menggunakan plugin PVS-Studio untuk IDEA dan Android Studio . Peringatan penganalisis didistribusikan sebagai berikut:



  • 183 Tinggi;
  • 759 Sedang;
  • 545 Rendah.


Sekitar 2/3 dari pemicu penganalisis PVS-Studio ditugaskan untuk menguji file. Mempertimbangkan fakta ini dan ukuran basis kode proyek, kami dapat mengatakan bahwa pengembang Apache Flink berhasil menjaga kualitas kode di atas.



Setelah mempelajari peringatan penganalisis secara lebih rinci, saya telah memilih yang paling menarik menurut saya. Jadi mari kita lihat apa yang berhasil ditemukan PVS-Studio dalam proyek ini!





Hanya sedikit kecerobohan



V6001 Ada sub-ekspresi identik ' processingData ' di kiri dan kanan operator '=='. CheckpointStatistics.java (229)



@Override
public boolean equals(Object o) 
{
  ....
  CheckpointStatistics that = (CheckpointStatistics) o;
  return id == that.id &&
    savepoint == that.savepoint &&
    triggerTimestamp == that.triggerTimestamp &&
    latestAckTimestamp == that.latestAckTimestamp &&
    stateSize == that.stateSize &&
    duration == that.duration &&
    alignmentBuffered == that.alignmentBuffered &&
    processedData == processedData &&                // <=
    persistedData == that.persistedData &&
    numSubtasks == that.numSubtasks &&
    numAckSubtasks == that.numAckSubtasks &&
    status == that.status &&
    Objects.equals(checkpointType, that.checkpointType) &&
    Objects.equals(
      checkpointStatisticsPerTask, 
      that.checkpointStatisticsPerTask);
}
      
      





Terhadap latar belakang ekspresi lain sebagai gantinya, kesalahan ini tidak terlalu mencolok. Saat menimpa metode sama dengan untuk kelas CheckpointStatistics , programmer membuat kesalahan dalam ekspresi processingData == diprosesData , yang tidak ada artinya karena selalu benar. Demikian pula, sisa ekspresi sebagai gantinya akan dibandingkan bidang objek saat ini dan objek That : processingData == that.processedData... Situasi ini adalah salah satu pola kesalahan umum yang ditemukan dalam fungsi perbandingan, yang dijelaskan secara rinci dalam artikel " Kehidupan jahat dalam fungsi perbandingan ". Jadi ternyata hanya "sedikit kurangnya perhatian" memecahkan logika untuk memeriksa kesetaraan objek dari kelas CheckpointStatistics .



Ekspresi selalu benar



V6007 Ekspresi 'input2.length> 0' selalu benar. Operator.java (283)



public static <T> Operator<T> createUnionCascade(Operator<T> input1, 
                                                 Operator<T>... input2) 
{
  if (input2 == null || input2.length == 0) 
  {
    return input1;                                // <=
  } 
  else if (input2.length == 1 && input1 == null) 
  {
    return input2[0];
  }
  ....
  if (input1 != null) 
  {
    ....
  } 
  else if (input2.length > 0 && input2[0] != null) // <=
  {
    ....
  } 
  else 
  {
    ....
  }
}
      
      





Dalam metode ini, penganalisis ternyata lebih penuh perhatian daripada orang, yang memutuskan untuk melaporkan dengan caranya sendiri yang khas, yang menunjukkan bahwa ekspresi input2.length> 0 akan selalu benar. Alasannya adalah jika panjang dari array input2 adalah 0, maka kondisi input2 == null || input2.length == 0 pertama jika dalam metode akan benar, dan eksekusi metode akan terputus sebelum mencapai baris dengan ekspresi input2.length> 0 .



Penganalisis yang melihat semua



V6007 Ekspresi 'slotSharingGroup == null' selalu salah. StreamGraphGenerator.java (510)



private <T> Collection<Integer> transformFeedback(....)
{
  ....
  String slotSharingGroup = determineSlotSharingGroup(null, allFeedbackIds);
  if (slotSharingGroup == null)
  {
    slotSharingGroup = "SlotSharingGroup-" + iterate.getId();
  }
  ....
}
      
      





Penganalisis melaporkan bahwa slotSharingGroup == null selalu salah. Hal ini menunjukkan bahwa determineSlotSharingGroup metode tidak akan pernah kembali nol . Apakah penganalisis sangat cerdas sehingga dapat menghitung semua nilai yang dapat dikembalikan oleh metode ini? Mari kita periksa semuanya sendiri:



public class StreamGraphGenerator 
{
  ....
  public static final String DEFAULT_SLOT_SHARING_GROUP = "default";
  ....
  private String determineSlotSharingGroup(String specifiedGroup, 
                                           Collection<Integer> inputIds) 
  {
    if (specifiedGroup != null)
    {
      return specifiedGroup; // <= 1
    }
    else
    {
      String inputGroup = null;
      for (int id: inputIds)
      {
        String inputGroupCandidate = streamGraph.getSlotSharingGroup(id);
        if (inputGroup == null)
        {
          inputGroup = inputGroupCandidate;
        }
        else if (!inputGroup.equals(inputGroupCandidate))
        {
          return DEFAULT_SLOT_SHARING_GROUP; // <= 2
        }
      }
      return inputGroup == null 
             ? DEFAULT_SLOT_SHARING_GROUP 
             : inputGroup; // <= 3
    }
  }
  ....
}
      
      





Dalam urutan kita melalui semua pengembalian dan melihat apa yang bisa mendapatkan kembali metode ini:



  • Pertama kembali akan kembali argumen ke specifiedGroup metode , tetapi hanya jika tidak nol .
  • return for DEFAULT_SLOT_SHARING_GROUP, ;
  • return inputGroup, null. DEFAULT_SLOT_SHARING_GROUP.


Ternyata analisa itu benar-benar dapat menghitung ketidakmungkinan kembali nol dari determineSlotSharingGroup metode dan memperingatkan kita tentang hal ini, menunjukkan berartinya memeriksa slotSharingGroup == null . Dan meskipun situasi ini tidak salah, perlindungan tambahan dari penganalisis tersebut dapat mendeteksi kesalahan dalam beberapa kasus lain. Misalnya, saat Anda ingin metode mengembalikan null dalam kondisi tertentu.



Kumpulkan mereka semua



V6007 Ekspresi 'currentCount <= lastEnd' selalu benar. CountSlidingWindowAssigner.java (75)



V6007 Expression 'lastStart <= CURRENTCOUNT' selalu benar. CountSlidingWindowAssigner.java (75)



@Override
public Collection<CountWindow> assignWindows(....) throws IOException 
{
  Long countValue = count.value();
  long currentCount = countValue == null ? 0L : countValue;
  count.update(currentCount + 1);
  long lastId = currentCount / windowSlide;
  long lastStart = lastId * windowSlide;
  long lastEnd = lastStart + windowSize - 1;
  List<CountWindow> windows = new ArrayList<>();
  while (lastId >= 0 && 
         lastStart <= currentCount && 
         currentCount <= lastEnd) 
  {
    if (lastStart <= currentCount && currentCount <= lastEnd) // <=
    {
      windows.add(new CountWindow(lastId));
    }
    lastId--;
    lastStart -= windowSlide;
    lastEnd -= windowSlide;
  }
  return windows;
}
      
      





Penganalisis memperingatkan bahwa ekspresi currentCount <= lastEnd dan lastStart <= currentCount selalu benar. Dan memang, jika Anda melihat kondisi while loop , maka terdapat ekspresi yang persis sama. Ini berarti bahwa di dalam loop ekspresi ini akan selalu bernilai true, sehingga semua objek dari tipe CountWindow yang dibuat dalam loop akan ditambahkan ke daftar windows . Ada banyak pilihan untuk tampilan cek yang tidak berarti ini, dan hal pertama yang terlintas dalam pikiran adalah artefak refactoring, atau jaminan pengembang. Tetapi itu bisa menjadi kesalahan, jika Anda ingin memeriksa sesuatu yang lain ...



Urutan argumen salah



V6029 Kemungkinan urutan argumen salah diteruskan ke metode: 'hasBufferForReleasedChannel', 'hasBufferForRemovedChannel'. NettyMessageClientDecoderDelegateTest.java (165), NettyMessageClientDecoderDelegateTest.java (166)



private void testNettyMessageClientDecoding(
       boolean hasEmptyBuffer,
       boolean hasBufferForReleasedChannel,
       boolean hasBufferForRemovedChannel) throws Exception 
{
  ....
  List<BufferResponse> messages = createMessageList (
    hasEmptyBuffer,
    hasBufferForReleasedChannel,
    hasBufferForRemovedChannel);
  ....
}
      
      





Kurangnya kemampuan Java untuk memanggil metode dengan parameter bernama terkadang memainkan lelucon yang kejam dengan pengembang. Inilah yang sebenarnya terjadi ketika penganalisis menunjuk ke metode createMessageList . Melihat definisi metode ini, menjadi jelas bahwa parameter hasBufferForRemovedChannel harus diteruskan ke metode sebelum parameter hasBufferForReleasedChannel :



private List<BufferResponse> createMessageList(
  boolean hasEmptyBuffer,
  boolean hasBufferForRemovedChannel,
  boolean hasBufferForReleasedChannel) 
{
  ....
  if (hasBufferForReleasedChannel) {
    addBufferResponse(messages, 
                      releasedInputChannelId, 
                      Buffer.DataType.DATA_BUFFER, 
                      BUFFER_SIZE, 
                      seqNumber++);
  }
  if (hasBufferForRemovedChannel) {
    addBufferResponse(messages, 
                      new InputChannelID(), 
                      Buffer.DataType.DATA_BUFFER, 
                      BUFFER_SIZE, 
                      seqNumber++);
  }
  ....
  return messages;
}
      
      





Namun, saat memanggil metode tersebut, pengembang telah mencampur urutan argumen ini, itulah sebabnya logika metode createMessageList akan rusak jika nilai dari argumen campuran berbeda.



Oh, copy-paste ini



V6032 Aneh bahwa tubuh metode 'seekToFirst' sepenuhnya setara dengan tubuh metode lain 'seekToLast'. RocksIteratorWrapper.java (53), RocksIteratorWrapper.java (59)



public class RocksIteratorWrapper implements RocksIteratorInterface, Closeable {
  ....
  private RocksIterator iterator;
  ....

  @Override
  public void seekToFirst() {
    iterator.seekToFirst(); // <=
    status(); 
  }
  
  @Override
  public void seekToLast() {
    iterator.seekToFirst();  // <=
    status();
  }
  
  ....
}
      
      





Badan metode seekToFirst dan seekToLast adalah sama. Selain itu, kedua metode tersebut digunakan dalam kode.



Ada sesuatu yang najis di sini! Memang, jika Anda melihat metode apa yang dimiliki objek iterator , akan menjadi jelas kesalahan apa yang dibantu oleh penganalisis untuk menemukan:



public class RocksIterator extends AbstractRocksIterator<RocksDB>
{
  ....
}

public abstract class AbstractRocksIterator<....> extends ....
{
  ....
  public void seekToFirst() // <=
  {
    assert this.isOwningHandle();
    this.seekToFirst0(this.nativeHandle_);
  }
  
  public void seekToLast() // <=
  {
    assert this.isOwningHandle();
    this.seekToLast0(this.nativeHandle_);
  }
  ....
}
      
      





Ternyata metode seekToLast kelas RocksIteratorWrapper dibuat dengan metode copy-paste seekToFirst kelas yang sama. Namun, untuk beberapa alasan, pengembang lupa untuk mengganti dengan iterator 's seekToFirst metode panggilan dengan seekToLast .



Kebingungan dengan string format



V6046 Format salah. Jumlah item format yang berbeda diharapkan. Argumen tidak digunakan: 1. UnsignedTypeConversionITCase.java (102)



public static void prepareMariaDB() throws IllegalStateException {
  ....
  if (!initDbSuccess) {
    throw new IllegalStateException(
      String.format(
        "Initialize MySQL database instance failed after {} attempts," + // <=
        " please open an issue.", INITIALIZE_DB_MAX_RETRY));
  }
}
      
      





String format metode String.format dan Java logger berbeda. Tidak seperti string format metode String.format , di mana substitusi argumen ditentukan menggunakan karakter '%', string format logger menggunakan kombinasi karakter '{}' sebagai gantinya. Karena kebingungan ini, kesalahan ini terjadi. Sebagai string format, string diteruskan ke metode String.format , yang kemungkinan besar disalin dari tempat lain di mana ia digunakan di beberapa logger. Akibatnya, nilai bidang INITIALIZE_DB_MAX_RETRY tidak akan diganti dalam pesan IllegalStateException . alih-alih '{}', dan orang yang menangkap atau mencatat pengecualian ini tidak akan pernah tahu berapa banyak upaya untuk menyambung ke database yang telah dilakukan.



Distribusi tidak normal



V6048 Ekspresi ini dapat disederhanakan. Operand 'index' dalam operasi sama dengan 0. CollectionUtil.java (76)



public static <T> Collection<List<T>> partition(Collection<T> elements, 
                                                int numBuckets) 
{
  Map<Integer, List<T>> buckets = new HashMap<>(numBuckets);
  
  int initialCapacity = elements.size() / numBuckets;

  int index = 0;
  for (T element : elements) 
  {
    int bucket = index % numBuckets;                                 // <=
    buckets.computeIfAbsent(bucket, 
                            key -> new ArrayList<>(initialCapacity))

           .add(element); 
  }

  return buckets.values();
}
      
      





Metode partisi membagi elemen dari kumpulan elemen menjadi beberapa segmen, lalu mengembalikan segmen tersebut. Namun, karena kesalahan yang ditunjukkan oleh penganalisis, tidak ada pemisahan yang akan terjadi. Ekspresi yang digunakan untuk menentukan indeks nomor segmen % numBuckets akan selalu 0, karena indeks selalu 0. Awalnya saya mengira bahwa kode untuk metode ini telah direfraktorisasi, akibatnya mereka lupa menambahkan kenaikan variabel indeks di loop for . Tapi melihat komitdimana metode ini ditambahkan, ternyata kesalahan ini datang bersamaan dengan metode ini. Versi kode yang diperbaiki:



public static <T> Collection<List<T>> partition(Collection<T> elements, 
                                                int numBuckets) 
{
  Map<Integer, List<T>> buckets = new HashMap<>(numBuckets);
  
  int initialCapacity = elements.size() / numBuckets;

  int index = 0;
  for (T element : elements) 
  {
    int bucket = index % numBuckets; 
    buckets.computeIfAbsent(bucket, 
                            key -> new ArrayList<>(initialCapacity))
           .add(element);
    index++;
  }

  return buckets.values();
}
      
      





Jenis tidak kompatibel



V6066 Jenis objek yang diteruskan sebagai argumen tidak sesuai dengan jenis koleksi: String, ListStateDescriptor <NextTransactionalIdHint>. FlinkKafkaProducer.java (1083)



public interface OperatorStateStore 
{
  Set<String> getRegisteredStateNames();
}
public class FlinkKafkaProducer<IN> extends ....
{
  ....
  private static final 
  ListStateDescriptor<FlinkKafkaProducer.NextTransactionalIdHint>
  NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR = ....;

  @Override
  public void initializeState(FunctionInitializationContext context).... 
  {
    ....
    if (context.getOperatorStateStore()
               .getRegisteredStateNames()
               .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR))    // <=
    {
       migrateNextTransactionalIdHindState(context);
    }
    ....
  }
}
      
      





Ekspresi yang ditunjukkan oleh penganalisis akan selalu salah, yang berarti panggilan ke metode migrateNextTransactionalIdHindState tidak akan pernah terjadi. Bagaimana bisa terjadi bahwa seseorang mencari kumpulan tipe Set <String> untuk elemen dari tipe yang sama sekali berbeda - ListStateDescriptor <FlinkKafkaProducer.NextTransactionalIdHint> ? Tanpa bantuan penganalisis, kesalahan seperti itu kemungkinan besar akan tinggal di kode untuk waktu yang sangat lama, karena tidak mencolok dan tidak mungkin menemukannya tanpa pengujian menyeluruh terhadap metode ini.



Perubahan variabel non-atom



V6074 Modifikasi non-atom dari variabel volatil. Periksa 'currentNumAcknowledgedSubtasks'. PendingCheckpointStats.java (131)



boolean reportSubtaskStats(JobVertexID jobVertexId, SubtaskStateStats subtask) {
  TaskStateStats taskStateStats = taskStats.get(jobVertexId);

  if (taskStateStats != null && taskStateStats.reportSubtaskStats(subtask)) {
    currentNumAcknowledgedSubtasks++;                // <=
    latestAcknowledgedSubtask = subtask;

    currentStateSize += subtask.getStateSize();      // <=

    long processedData = subtask.getProcessedData();
    if (processedData > 0) {
      currentProcessedData += processedData;         // <=
    }

    long persistedData = subtask.getPersistedData();
    if (persistedData > 0) {
      currentPersistedData += persistedData;         // <=
    }
    return true;
  } else {
    return false;
  }
}
      
      





Ditambah 3 peringatan analyzer lainnya dengan metode yang sama:



  • V6074 Modifikasi non-atom dari variabel volatil. Periksa 'currentStateSize'. PendingCheckpointStats.java (134)
  • V6074 Modifikasi non-atom dari variabel volatil. Periksa 'currentProcessedData'. PendingCheckpointStats.java (138)
  • V6074 Modifikasi non-atom dari variabel volatil. Periksa 'currentPersistedData'. PendingCheckpointStats.java (143)


Penganalisis menyarankan sebanyak 4 bidang volatil dalam metode perubahan non-atom. Dan penganalisisnya, seperti biasa, ternyata benar, karena operasi ++ dan + = sebenarnya adalah urutan dari beberapa operasi baca-ubah-tulis. Seperti yang Anda ketahui, nilai volatil suatu bidang dapat dilihat oleh semua utas, yang berarti bahwa bagian dari bidang yang berubah mungkin hilang karena kondisi balapan. Anda dapat membaca informasi lebih rinci tentang ini di deskripsi diagnostik.



Kesimpulan



Dalam proyek Big Data, keandalan adalah salah satu persyaratan utama; oleh karena itu, kualitas kode di dalamnya harus dipantau secara ketat. Pengembang Apache Flink telah dibantu dalam hal ini oleh beberapa alat, dan mereka juga menulis sejumlah besar pengujian. Namun, bahkan dalam kondisi seperti itu, penganalisis PVS-Studio dapat menemukan kesalahan. Tidak mungkin untuk sepenuhnya menghilangkan kesalahan, tetapi penggunaan berbagai alat analisis kode statis secara teratur akan memungkinkan Anda untuk mendekati ideal ini. Ya, secara teratur. Hanya dengan penggunaan biasa analisis statis menunjukkan keefektifannya, yang dijelaskan secara lebih rinci dalam artikel ini .





Jika Anda ingin berbagi artikel ini dengan audiens berbahasa Inggris, silakan gunakan tautan terjemahan: Valery Komarov. Data Besar / Bug: Menganalisis Kode Sumber Apache Flink .



All Articles