crawler aio api

Halo. Saya mulai mengerjakan perpustakaan untuk menarik data dari json api yang berbeda. Itu juga dapat digunakan untuk menguji api.



Apis digambarkan sebagai kelas, misalnya



class Categories(JsonEndpoint):
    url = "http://127.0.0.1:8888/categories"
    params = {"page": range(100), "language": "en"}
    headers = {"User-Agent": get_user_agent}
    results_key = "*.slug"

categories = Categories()


class Posts(JsonEndpoint):
    url = "http://127.0.0.1:8888/categories/{category}/posts"
    params = {"page": range(100), "language": "en"}
    url_params = {"category": categories.iter_results()}
    results_key = "posts"

    async def comments(self, post):
        comments = Comments(
            self.session,
            url_params={"category": post.url.params["category"], "id": post["id"]},
        )
        return [comment async for comment in comments]

posts = Posts()


Param dan url_params dapat berisi fungsi (seperti di sini get_user_agent - mengembalikan useragent acak), range, iterator, iterator yang dapat menunggu dan asynchronous (sehingga Anda dapat menautkannya bersama).



Header parameter dan cookie juga dapat berisi fungsi dan menunggu.



Api kategori dalam contoh di atas mengembalikan larik objek yang memiliki siput, iterator akan mengembalikan persis seperti itu. Dengan menyelipkan iterator ini ke url_params posting, iterator akan secara rekursif mengulang semua kategori dan semua halaman di masing-masing kategori. Ini akan dibatalkan ketika menemukan 404 atau kesalahan lainnya dan pindah ke kategori berikutnya.



Dan repositori memiliki contoh server aiohttp untuk kelas-kelas ini sehingga semuanya dapat diuji.



Selain parameter get, Anda dapat meneruskannya sebagai data atau json dan menyetel metode lain.



results_key bertitik dan akan mencoba menarik kunci dari hasil. Misalnya "komentar. *. Teks" akan mengembalikan teks setiap komentar dari larik di dalam komentar.



Hasilnya dibungkus dengan pembungkus yang memiliki properti url dan params. url diturunkan dari string yang juga memiliki params. Dengan demikian, Anda dapat mengetahui parameter apa yang digunakan untuk mendapatkan hasil ini, yang ditunjukkan dalam metode komentar.



Ada juga kelas Sink dasar untuk menangani hasil. Misalnya, melipatnya menjadi mq atau database. Ia bekerja dalam tugas terpisah dan menerima data melalui asyncio.Queue.



class LoggingSink(Sink):
    def transform(self, obj):
        return repr(obj)

    async def init(self):
        from loguru import logger

        self.logger = logger

    async def process(self, obj):
        self.logger.info(obj)
        return True

sink = LoggingSink(num_tasks=1)


Contoh Sink paling sederhana. Metode transformasi memungkinkan kita untuk melakukan manipulasi objek dan mengembalikan None jika tidak cocok untuk kita. itu. dalam tema Anda juga dapat melakukan validasi.



Sink adalah pengelola konteks asinkron, yang jika keluar, secara teori akan menunggu hingga semua objek dalam antrean diproses, lalu membatalkan tugasnya.



Dan akhirnya, untuk mengikat semuanya, saya membuat kelas Pekerja. Ini menerima satu titik akhir dan beberapa wastafel. Contohnya,



worker = Worker(endpoint=posts, sinks=[loggingsink, mongosink])
worker.run()


run akan menjalankan asyncio.run_until_complete untuk pipeline pekerja. Ini juga memiliki metode transformasi.



Ada juga kelas WorkerGroup yang memungkinkan Anda membuat beberapa pekerja sekaligus dan membuat asyncio.gather untuk mereka.



Kode tersebut berisi contoh server yang menghasilkan data melalui faker dan penangan untuk titik akhirnya. Saya pikir ini yang paling jelas.



Semua ini masih pada tahap awal pengembangan dan selama ini saya sudah sering berganti api. Tapi sekarang tampaknya telah sampai pada bagaimana seharusnya terlihat. Saya hanya akan menggabungkan permintaan dan komentar ke kode saya.



All Articles