Saya ingin tahu segalanya tentang klien! Atau cara memperkaya fakta DWH kering dengan jalur digital dan properti klien dari Amplitude

Repositori perusahaan Betting League dibuat jauh sebelum pengenalan Amplitude . Ini digunakan terutama oleh analis dan peneliti. Produk dan pemasar beralih ke analis untuk mendapatkan analitik dari gudang karena memerlukan keterampilan pemrograman.







Fakta DWH selalu kekurangan sesuatu dari toko kelontong, visi digital dalam produk yang akan memata-matai pelanggan dan memberi kita wawasan tentang jalurnya. Dengan munculnya Amplitudo di perusahaan, kami mulai memahami nilai dari data yang terakumulasi dalam sistem dan sangat keren untuk menggunakannya di Amplitudo itu sendiri, tetapi simbiosis dari dua sistem DWH dan Amplitudo tidak berhenti. Kami, tentu saja, menerapkan mekanisme transfer data dari Amplitude untuk analisis internal di gudang perusahaan dan membuat instruksi untuk menyiapkan transfer data dari Amplitude ke DWH. Kami juga mengundang Anda ke Betting League dan webinar Adventum tentang analisis dan pengoptimalan konversi dalam produk .







gambar







Bagaimana agregasi data DWH dapat membantu



1. . DWH, .







2. . .







3. . , API . .







Amplitude DWH



Amplitude API . . . , , , . . , , UTC — , .







. Python, SQL . ! Amplitude , .







, — Amplitude . , CSV, ETL .







ETL — Extract, Transform, Load. , , DWH .







. , . , , .







Python 3.7 . , flow- (, , dag), , Windows. .bat ( ). , .







1.



# 
import os
import requests
import pandas as pd
import zipfile
import gzip
import time
from tqdm import tqdm
import pymssql
import datetime
import pyodbc
from urllib.parse import quote_plus
from sqlalchemy import create_engine, event
      
      





2.



, , . , .







#   
os.chdir("C:\Agents\AMPL\TEMP") #    
dts1 = time.time() #       
a = time.time() #  
now = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") #    
      
      





3. API Amplitude



, (Settings => Project = > General).







#    API 
api_key = ''
secret_key = '  '
      
      





4. ()



, , . SQL , . yyyymmddThh (. . ). API , .







#     DWH (SQL)
server = " "
user = ""
password = ""

#     
conn = pymssql.connect(server, user, password, " ")
cursor = conn.cursor()
cursor.execute("   .    select")
      
      





5.



API Amplitude. .







#        
for row in cursor: 
    dt = row[0]
conn.close()   
      
      





6.



, . , , , , .







#   ,     
filename = 'AMPL_PROD_'+ dt + '_' + now

#  ,     \\  WIN
#      ,      os.chdir
working_dir = os.getcwd() + '\\'

      
      





7. SQL



SQL. , .







#    DWH (SQL). ,           
server = ' '
database = ' '
schema = ' '
table_to_export = ' '

#    DWH (SQL)
params = 'DRIVER= {SQL Server};SERVER='+server+';DATABASE='+database+';User='+user+';Password='+password+';'
quoted = quote_plus(params)
new_con = 'mssql+pyodbc:///?odbc_connect={}'.format(quoted)
      
      





8. Amplitude



Amplitude , , json .







#     API ,     json
class GetFile():

    def __init__(self, now, dt, api_key, secret_key, filename, working_dir):

        self.now = now
        self.dt = dt
        self.api_key = api_key
        self.secret_key = secret_key
        self.filename = filename
        self.working_dir = working_dir

    def response(self):
        """
           
        """
        print('   !', end='\n')
        count = 0
        while True:
            count += 1
            print(f' {count}.....', end='')
            try:
                response = requests.get('https://amplitude.com/api/2/export?start='+self.dt+'&end='+self.dt,
                                        auth=(self.api_key, self.secret_key),
                                        timeout=10)
                print('', end='\n') 
                print('1.    ', end='\n')
                break
            except:
                print('', end='\n')
                time.sleep(1)

        return response

    def download(self):
        '''
           
        '''
        with open(working_dir + self.filename + '.zip', "wb") as code:
            file = self.response()
            print('2.    .....', end='')           
            code.write(file.content)
        print('OK', end='\n')

    def extraction(self):
        '''
             
        '''
        z = zipfile.ZipFile(self.working_dir + self.filename + '.zip', 'r')
        z.extractall(path=self.working_dir + self.filename)
        print('3.         ' + self.filename)

    def getfilename(self):
        '''
            
        '''
        return ': {} \n : {}'.format(self.filename, self.working_dir + self.filename + '.zip')

def unzip_and_create_df(working_dir, filename):
        '''
         JSON.gz   json     (   )
         ,    .
        '''
        directory = working_dir + filename + '\\274440'
        files = os.listdir(directory)
        df = pd.DataFrame()
        print('  :')
        time.sleep(1)
        for i in tqdm(files):
            with gzip.open(directory + '\\' + i) as f:
                add = pd.read_json(f, lines=True)
            df = pd.concat([df, add], axis=0)
        time.sleep(1)    
        print('4. JSON         dataframe')
        return df

#    
file = GetFile(now, dt, api_key, secret_key, filename, working_dir)

#   (      )
file.download()

#  gz-   
file.extraction()

#   DataFrame    json.gz
adf = unzip_and_create_df(working_dir, filename)

      
      





9. ()



, . . , SQL. .







#    
print('5.    ,  , , .....', end='')

#   DWH        
#       -   
sql_query_columns = ("""
                        '             '
                    """)

settdf = pd.read_sql_query(sql_query_columns, new_con)

#   lower()  (= )   SAVE_COLUMN_NAME  dwh
#   , lower()       
settdf['SAVE_COLUMN_NAME'] = settdf['SAVE_COLUMN_NAME'].apply(lambda x: x.lower())
adf.columns = [''.join(j.title() for j in i.split('_')).lower() for i in adf.columns]

#   
needed_columns = [i for i in settdf['SAVE_COLUMN_NAME'].to_list()]

#     
needed_columns.append('DOWNLOAD_FILE_NAME')

#    DF c  
adf['DOWNLOAD_FILE_NAME'] = filename

#   ( , ,  )
adf.reset_index(inplace=True)

#    ( )   ,   
adf = adf.astype('unicode_').where(pd.notnull(adf), None)

#  DataFrame    
df_to_sql = adf[needed_columns]

#     
print('OK', end='\n')
      
      





10.



. .







#    DWH
#   
dts2 = time.time()
print('6.    ...', end='')

#      DWH
connection = pyodbc.connect(params)
engine = create_engine(new_con)

#   ()    DWH (   -  )
@event.listens_for(engine, 'before_cursor_execute')
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
    if executemany:
        cursor.fast_executemany = True

#  None  RAM
df_to_sql.to_sql(table_to_export, engine, schema=schema, if_exists='append', chunksize=100000, index=False)

#    
connection.close() 
print('OK', end='\n')

dtf = time.time()
diff1, diff2 = [str(int((dtf - i) // 60)) + '  ' + str(int((dtf - i) % 60)) + ' ' for i in (dts1, dts2)]
print(f' : {diff1},   : {diff2}')
print(' ,   ')

      
      





11.



! . .







#    
#     
conn2 = pymssql.connect(server, user, password, "  ")
cursor2 = conn2.cursor()
query = "      ,  ")

#  
cursor2.execute(query)

#    
conn2.commit()
conn2.close()

      
      





12.



, . . , ? , ETL , .







print('  ')

#       ETL       
conn3 = pymssql.connect(server, user, password, "  ")
cursor3 = conn3.cursor()
query = " ETL   .  EXEC dbo.SP"

cursor3.execute(query)

conn3.commit()
conn3.close()
      
      





13.



, .







#      
b = time.time()
diff = b-a
minutes = diff//60
print('  : {:.0f} ()'.format( minutes))

      
      





. — , .







, ETL, . , - , .







, Amplitude, . s2s , .







20 17:00 . . , .








All Articles